This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 8967738e93 HDDS-7492. Extend Placement Policy Interface to select
mis-replicated replicas to copy (#4006)
8967738e93 is described below
commit 8967738e93e174707dc6ad7ab7da7b0f904d09b8
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Thu Dec 8 05:45:42 2022 -0700
HDDS-7492. Extend Placement Policy Interface to select mis-replicated
replicas to copy (#4006)
---
.../apache/hadoop/hdds/scm/PlacementPolicy.java | 15 +-
.../hadoop/hdds/scm/SCMCommonPlacementPolicy.java | 103 +++++++--
.../ContainerPlacementStatusDefault.java | 36 +++-
.../algorithms/SCMContainerPlacementRackAware.java | 8 +
.../SCMContainerPlacementRackScatter.java | 77 ++++---
.../algorithms/SCMContainerPlacementRandom.java | 3 +-
.../hdds/scm/pipeline/PipelinePlacementPolicy.java | 8 +
.../org/apache/hadoop/hdds/scm/HddsTestUtils.java | 32 ++-
.../hdds/scm/TestSCMCommonPlacementPolicy.java | 240 ++++++++++++++++++++-
.../algorithms/TestContainerPlacementFactory.java | 12 +-
.../TestSCMContainerPlacementRackScatter.java | 56 +++--
.../org/apache/ozone/test/GenericTestUtils.java | 10 +
.../ozone/recon/fsck/TestContainerHealthTask.java | 10 +-
13 files changed, 521 insertions(+), 89 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PlacementPolicy.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PlacementPolicy.java
index b240e5c3b7..af41a15740 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PlacementPolicy.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PlacementPolicy.java
@@ -22,12 +22,13 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
/**
* A PlacementPolicy support choosing datanodes to build
* pipelines or containers with specified constraints.
*/
-public interface PlacementPolicy {
+public interface PlacementPolicy<Replica> {
default List<DatanodeDetails> chooseDatanodes(
List<DatanodeDetails> excludedNodes,
@@ -60,9 +61,17 @@ public interface PlacementPolicy {
* Given a list of datanode and the number of replicas required, return
* a PlacementPolicyStatus object indicating if the container meets the
* placement policy - ie is it on the correct number of racks, etc.
- * @param dns List of datanodes holding a replica of the container
+ * @param dns List of replica holding a replica of the container
* @param replicas The expected number of replicas
*/
ContainerPlacementStatus validateContainerPlacement(
- List<DatanodeDetails> dns, int replicas);
+ List<DatanodeDetails> dns, int replicas);
+
+ /**
+ * Given a set of replicas of a container which are
+ * neither over underreplicated nor overreplicated,
+ * return a set of replicas to copy to another node to fix misreplication.
+ * @param replicas
+ */
+ Set<Replica> replicasToCopyToFixMisreplication(Set<Replica> replicas);
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
index 5f7e944235..535b03508d 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
@@ -18,35 +18,40 @@
package org.apache.hadoop.hdds.scm;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Random;
-import java.util.stream.Collectors;
-
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.Node;
import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
-
-import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
/**
* This policy implements a set of invariants which are common
* for all basic placement policies, acts as the repository of helper
* functions which are common to placement policies.
*/
-public abstract class SCMCommonPlacementPolicy implements PlacementPolicy {
+public abstract class SCMCommonPlacementPolicy implements
+ PlacementPolicy<ContainerReplica> {
@VisibleForTesting
static final Logger LOG =
LoggerFactory.getLogger(SCMCommonPlacementPolicy.class);
@@ -346,6 +351,22 @@ public abstract class SCMCommonPlacementPolicy implements
PlacementPolicy {
return 1;
}
+ /**
+ * Default implementation to return the max number of replicas per rack.
+ * For simple policies that are not rack aware
+ * we return numReplicas, from this default implementation.
+ *
+ * @param numReplicas - The desired replica counts
+ * @param numberOfRacks - The desired number of racks
+ * @return The max number of replicas per rack
+ */
+ protected int getMaxReplicasPerRack(int numReplicas, int numberOfRacks) {
+ return numReplicas / numberOfRacks
+ + Math.min(numReplicas % numberOfRacks, 1);
+ }
+
+
+
/**
* This default implementation handles rack aware policies and non rack
* aware policies. If a future placement policy needs to check more than
racks
@@ -363,6 +384,7 @@ public abstract class SCMCommonPlacementPolicy implements
PlacementPolicy {
public ContainerPlacementStatus validateContainerPlacement(
List<DatanodeDetails> dns, int replicas) {
NetworkTopology topology = nodeManager.getClusterNetworkTopologyMap();
+ // We have a network topology so calculate if it is satisfied or not.
int requiredRacks = getRequiredRackCount(replicas);
if (topology == null || replicas == 1 || requiredRacks == 1) {
if (dns.size() > 0) {
@@ -372,22 +394,22 @@ public abstract class SCMCommonPlacementPolicy implements
PlacementPolicy {
return invalidPlacement;
}
}
- // We have a network topology so calculate if it is satisfied or not.
- int numRacks = 1;
+ Map<Node, Long> currentRackCount = dns.stream()
+ .collect(Collectors.groupingBy(this::getPlacementGroup,
+ Collectors.counting()));
final int maxLevel = topology.getMaxLevel();
// The leaf nodes are all at max level, so the number of nodes at
// leafLevel - 1 is the rack count
- numRacks = topology.getNumOfNodes(maxLevel - 1);
- final long currentRackCount = dns.stream()
- .map(d -> topology.getAncestor(d, 1))
- .distinct()
- .count();
-
+ int numRacks = topology.getNumOfNodes(maxLevel - 1);
if (replicas < requiredRacks) {
requiredRacks = replicas;
}
+ int maxReplicasPerRack = getMaxReplicasPerRack(replicas,
+ Math.min(requiredRacks, numRacks));
return new ContainerPlacementStatusDefault(
- (int)currentRackCount, requiredRacks, numRacks);
+ currentRackCount.size(), requiredRacks, numRacks, maxReplicasPerRack,
+ currentRackCount.values().stream().map(Long::intValue)
+ .collect(Collectors.toList()));
}
/**
@@ -426,4 +448,47 @@ public abstract class SCMCommonPlacementPolicy implements
PlacementPolicy {
}
return false;
}
+
+ /**
+ * Given a set of replicas of a container which are
+ * neither over underreplicated nor overreplicated,
+ * return a set of replicas to copy to another node to fix misreplication.
+ * @param replicas
+ */
+ @Override
+ public Set<ContainerReplica> replicasToCopyToFixMisreplication(
+ Set<ContainerReplica> replicas) {
+ Map<Node, List<ContainerReplica>> placementGroupReplicaIdMap
+ = replicas.stream().collect(Collectors.groupingBy(replica ->
+ this.getPlacementGroup(replica.getDatanodeDetails())));
+
+ int totalNumberOfReplicas = replicas.size();
+ int requiredNumberOfPlacementGroups =
+ getRequiredRackCount(totalNumberOfReplicas);
+ Set<ContainerReplica> copyReplicaSet = Sets.newHashSet();
+ List<List<ContainerReplica>> replicaSet = placementGroupReplicaIdMap
+ .values().stream()
+ .sorted((o1, o2) -> Integer.compare(o2.size(), o1.size()))
+ .limit(requiredNumberOfPlacementGroups)
+ .collect(Collectors.toList());
+ for (List<ContainerReplica> replicaList: replicaSet) {
+ int maxReplicasPerPlacementGroup = getMaxReplicasPerRack(
+ totalNumberOfReplicas, requiredNumberOfPlacementGroups);
+ int numberOfReplicasToBeCopied = Math.max(0,
+ replicaList.size() - maxReplicasPerPlacementGroup);
+ totalNumberOfReplicas -= maxReplicasPerPlacementGroup;
+ requiredNumberOfPlacementGroups -= 1;
+ if (numberOfReplicasToBeCopied > 0) {
+ List<ContainerReplica> replicasToBeCopied = replicaList.stream()
+ .limit(numberOfReplicasToBeCopied)
+ .collect(Collectors.toList());
+ copyReplicaSet.addAll(replicasToBeCopied);
+ }
+ }
+ return copyReplicaSet;
+ }
+
+ protected Node getPlacementGroup(DatanodeDetails dn) {
+ return nodeManager.getClusterNetworkTopologyMap().getAncestor(dn, 1);
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementStatusDefault.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementStatusDefault.java
index 3fdcd2f401..a0fb5db92d 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementStatusDefault.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementStatusDefault.java
@@ -18,6 +18,9 @@ package
org.apache.hadoop.hdds.scm.container.placement.algorithms;
import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
+import java.util.Collections;
+import java.util.List;
+
/**
* Simple Status object to check if a container is replicated across enough
* racks.
@@ -29,16 +32,32 @@ public class ContainerPlacementStatusDefault
private final int currentRacks;
private final int totalRacks;
+ private final int maxReplicasPerRack;
+ private final List<Integer> rackReplicaCnts;
+
+
public ContainerPlacementStatusDefault(int currentRacks, int requiredRacks,
- int totalRacks) {
+ int totalRacks, int maxReplicasPerRack, List<Integer> rackReplicaCnts) {
this.requiredRacks = requiredRacks;
this.currentRacks = currentRacks;
this.totalRacks = totalRacks;
+ this.maxReplicasPerRack = maxReplicasPerRack;
+ this.rackReplicaCnts = rackReplicaCnts;
+ }
+
+ public ContainerPlacementStatusDefault(int requiredRacks, int currentRacks,
+ int totalRacks) {
+ this(requiredRacks, currentRacks, totalRacks, 1,
+ currentRacks == 0 ? Collections.emptyList()
+ : Collections.nCopies(currentRacks, 1));
}
@Override
public boolean isPolicySatisfied() {
- return currentRacks >= totalRacks || currentRacks >= requiredRacks;
+ if (currentRacks < Math.min(totalRacks, requiredRacks)) {
+ return false;
+ }
+ return rackReplicaCnts.stream().allMatch(cnt -> cnt <= maxReplicasPerRack);
}
@Override
@@ -46,8 +65,13 @@ public class ContainerPlacementStatusDefault
if (isPolicySatisfied()) {
return null;
}
- return "The container is mis-replicated as it is on " + currentRacks +
- " racks but should be on " + requiredRacks + " racks.";
+ if (currentRacks < Math.min(requiredRacks, totalRacks)) {
+ return "The container is mis-replicated as it is on " + currentRacks +
+ " racks but should be on " + requiredRacks + " racks.";
+ }
+ return "The container is mis-replicated as max number of replicas per rack
"
+ + "is " + maxReplicasPerRack + " but number of replicas per rack" +
+ " are " + rackReplicaCnts.toString();
}
@Override
@@ -55,7 +79,9 @@ public class ContainerPlacementStatusDefault
if (isPolicySatisfied()) {
return 0;
}
- return requiredRacks - currentRacks;
+ return Math.max(requiredRacks - currentRacks,
+ rackReplicaCnts.stream().mapToInt(
+ cnt -> Math.max(maxReplicasPerRack - cnt, 0)).sum());
}
@Override
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
index 4ee7408fbb..4f07024f16 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
@@ -376,6 +376,14 @@ public final class SCMContainerPlacementRackAware
}
}
+ @Override
+ protected int getMaxReplicasPerRack(int numReplicas, int numberOfRacks) {
+ if (numberOfRacks == 1) {
+ return numReplicas;
+ }
+ return Math.max(numReplicas - 1, 1);
+ }
+
@Override
protected int getRequiredRackCount(int numReplicas) {
return REQUIRED_RACKS;
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
index eff2dc86c4..2f86df8708 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.container.placement.algorithms;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
@@ -35,7 +36,10 @@ import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -78,13 +82,19 @@ public final class SCMContainerPlacementRackScatter
}
public Set<DatanodeDetails> chooseNodesFromRacks(List<Node> racks,
- List<Node> unavailableNodes,
- List<DatanodeDetails> mutableFavoredNodes,
- int nodesRequired, final long metadataSizeRequired,
- final long dataSizeRequired, int maxOuterLoopIterations) {
+ List<Node> unavailableNodes,
+ List<DatanodeDetails> mutableFavoredNodes,
+ int nodesRequired, final Pair<Long, Long> metadatasizeDatasizePair,
+ int maxOuterLoopIterations, final Pair<Map<Node, Integer>, Integer>
+ rackCntMapMaxReplicaPerRackPair) {
if (nodesRequired <= 0) {
return Collections.emptySet();
}
+ final long metadataSizeRequired = metadatasizeDatasizePair.getKey();
+ final long dataSizeRequired = metadatasizeDatasizePair.getValue();
+ final Map<Node, Integer> rackCntMap =
+ rackCntMapMaxReplicaPerRackPair.getKey();
+ final int maxReplicasPerRack = rackCntMapMaxReplicaPerRackPair.getValue();
List<Node> toChooseRacks = new LinkedList<>();
Set<DatanodeDetails> chosenNodes = new LinkedHashSet<>();
Set<Node> skippedRacks = new HashSet<>();
@@ -97,8 +107,10 @@ public final class SCMContainerPlacementRackScatter
int chosenListSize = chosenNodes.size();
// Refill toChooseRacks, we put skippedRacks in front of toChooseRacks
- // for a even distribution
- toChooseRacks.addAll(racks);
+ // for an even distribution
+ toChooseRacks.addAll(racks.stream()
+ .filter(rack -> rackCntMap.getOrDefault(rack, 0)
+ < maxReplicasPerRack).collect(Collectors.toList()));
if (!skippedRacks.isEmpty()) {
toChooseRacks.removeAll(skippedRacks);
toChooseRacks.addAll(0, skippedRacks);
@@ -111,6 +123,7 @@ public final class SCMContainerPlacementRackScatter
Node curRack = getRackOfDatanodeDetails(favoredNode);
if (toChooseRacks.contains(curRack)) {
chosenNodes.add(favoredNode);
+ rackCntMap.merge(curRack, 1, Math::addExact);
toChooseRacks.remove(curRack);
chosenFavoredNodesInForLoop.add(favoredNode);
unavailableNodes.add(favoredNode);
@@ -137,6 +150,7 @@ public final class SCMContainerPlacementRackScatter
metadataSizeRequired, dataSizeRequired);
if (node != null) {
chosenNodes.add((DatanodeDetails) node);
+ rackCntMap.merge(rack, 1, Math::addExact);
mutableFavoredNodes.remove(node);
unavailableNodes.add(node);
nodesRequired--;
@@ -208,7 +222,6 @@ public final class SCMContainerPlacementRackScatter
" ExcludedNode = " + excludedNodesCount,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
-
List<DatanodeDetails> mutableFavoredNodes = new ArrayList<>();
if (favoredNodes != null) {
// Generate mutableFavoredNodes, only stores valid favoredNodes
@@ -228,14 +241,15 @@ public final class SCMContainerPlacementRackScatter
usedNodes = Collections.emptyList();
}
List<Node> racks = getAllRacks();
- Set<Node> usedRacks = usedNodes.stream()
+ Map<Node, Integer> usedRacksCntMap = usedNodes.stream()
.map(node -> networkTopology.getAncestor(node, RACK_LEVEL))
.filter(node -> node != null)
- .collect(Collectors.toSet());
+ .collect(Collectors.toMap(Function.identity(), e -> 1,
+ Math::addExact));
int requiredReplicationFactor = usedNodes.size() + nodesRequired;
- int numberOfRacksRequired =
- getRequiredRackCount(requiredReplicationFactor);
- int additionalRacksRequired = numberOfRacksRequired - usedRacks.size();
+ int numberOfRacksRequired =
getRequiredRackCount(requiredReplicationFactor);
+ int additionalRacksRequired =
+ numberOfRacksRequired - usedRacksCntMap.size();
if (nodesRequired < additionalRacksRequired) {
String reason = "Required nodes size: " + nodesRequired
+ " is less than required number of racks to choose: "
@@ -243,12 +257,15 @@ public final class SCMContainerPlacementRackScatter
LOG.warn("Placement policy cannot choose the enough racks. {}"
+ "Total number of Required Racks: {} Used Racks Count:"
+
" {}, Required Nodes count: {}",
- reason, numberOfRacksRequired, usedRacks.size(), nodesRequired);
+ reason, numberOfRacksRequired, usedRacksCntMap.size(),
+ nodesRequired);
throw new SCMException(reason,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
+ int maxReplicasPerRack = getMaxReplicasPerRack(requiredReplicationFactor,
+ numberOfRacksRequired);
// For excluded nodes, we sort their racks at rear
- racks = sortRackWithExcludedNodes(racks, excludedNodes, usedRacks);
+ racks = sortRackWithExcludedNodes(racks, excludedNodes, usedRacksCntMap);
List<Node> unavailableNodes = new ArrayList<>();
if (excludedNodes != null) {
@@ -265,14 +282,16 @@ public final class SCMContainerPlacementRackScatter
LOG.warn("Placement policy cannot choose the enough racks. {}"
+ "Total number of Required Racks: {} Used Racks Count:"
+
" {}, Required Nodes count: {}",
- reason, numberOfRacksRequired, usedRacks.size(), nodesRequired);
+ reason, numberOfRacksRequired, usedRacksCntMap.size(),
+ nodesRequired);
throw new SCMException(reason,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
chosenNodes.addAll(chooseNodesFromRacks(racks, unavailableNodes,
mutableFavoredNodes, additionalRacksRequired,
- metadataSizeRequired, dataSizeRequired, 1));
+ Pair.of(metadataSizeRequired, dataSizeRequired), 1,
+ Pair.of(usedRacksCntMap, maxReplicasPerRack)));
if (chosenNodes.size() < additionalRacksRequired) {
String reason = "Chosen nodes size from Unique Racks: " + chosenNodes
@@ -288,16 +307,13 @@ public final class SCMContainerPlacementRackScatter
}
if (chosenNodes.size() < nodesRequired) {
- racks.addAll(usedRacks);
- usedRacks.addAll(chosenNodes.stream()
- .map(node -> networkTopology.getAncestor(node, RACK_LEVEL))
- .filter(node -> node != null)
- .collect(Collectors.toSet()));
- sortRackWithExcludedNodes(racks, excludedNodes, usedRacks);
- racks.addAll(usedRacks);
+ racks.addAll(usedRacksCntMap.keySet());
+ racks = sortRackWithExcludedNodes(racks, excludedNodes, usedRacksCntMap);
+ racks.addAll(usedRacksCntMap.keySet());
chosenNodes.addAll(chooseNodesFromRacks(racks, unavailableNodes,
mutableFavoredNodes, nodesRequired - chosenNodes.size(),
- metadataSizeRequired, dataSizeRequired, Integer.MAX_VALUE));
+ Pair.of(metadataSizeRequired, dataSizeRequired),
+ Integer.MAX_VALUE, Pair.of(usedRacksCntMap,
maxReplicasPerRack)));
}
List<DatanodeDetails> result = new ArrayList<>(chosenNodes);
@@ -319,9 +335,8 @@ public final class SCMContainerPlacementRackScatter
.flatMap(List::stream).collect(Collectors.toList()),
requiredReplicationFactor);
if (!placementStatus.isPolicySatisfied()) {
- String errorMsg = "ContainerPlacementPolicy not met, currentRacks is " +
- placementStatus.actualPlacementCount() + " desired racks is " +
- placementStatus.expectedPlacementCount();
+ String errorMsg = "ContainerPlacementPolicy not met. Misreplication" +
+ " Reason: " + placementStatus.misReplicatedReason();
throw new SCMException(errorMsg, null);
}
return result;
@@ -416,7 +431,7 @@ public final class SCMContainerPlacementRackScatter
* @return
*/
private List<Node> sortRackWithExcludedNodes(List<Node> racks,
- List<DatanodeDetails> excludedNodes, Set<Node> usedRacks) {
+ List<DatanodeDetails> excludedNodes, Map<Node, Integer> usedRacks) {
if ((excludedNodes == null || excludedNodes.isEmpty())
&& usedRacks.isEmpty()) {
return racks;
@@ -425,12 +440,12 @@ public final class SCMContainerPlacementRackScatter
.map(node -> networkTopology.getAncestor(node, RACK_LEVEL))
// Dead Nodes have been removed from the topology and so have a
// null rack. We need to exclude those from the rack list.
- .filter(node -> node != null)
- .filter(node -> !usedRacks.contains(node))
+ .filter(Objects::nonNull)
+ .filter(node -> !usedRacks.containsKey(node))
.collect(Collectors.toSet());
List <Node> result = new ArrayList<>();
for (Node rack : racks) {
- if (!usedRacks.contains(rack) && !lessPreferredRacks.contains(rack)) {
+ if (!usedRacks.containsKey(rack) && !lessPreferredRacks.contains(rack)) {
result.add(rack);
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
index cdfd57d1d0..2aa1121101 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -40,7 +41,7 @@ import java.util.List;
* can be practically used.
*/
public final class SCMContainerPlacementRandom extends SCMCommonPlacementPolicy
- implements PlacementPolicy {
+ implements PlacementPolicy<ContainerReplica> {
@VisibleForTesting
public static final Logger LOG =
LoggerFactory.getLogger(SCMContainerPlacementRandom.class);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index befc0543a3..2287475d82 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -105,6 +105,14 @@ public final class PipelinePlacementPolicy extends
SCMCommonPlacementPolicy {
&& !p.isClosed();
}
+ @Override
+ protected int getMaxReplicasPerRack(int numReplicas, int numberOfRacks) {
+ if (numberOfRacks == 1) {
+ return numReplicas;
+ }
+ return Math.max(numReplicas - 1, 1);
+ }
+
/**
* Filter out viable nodes based on
* 1. nodes that are healthy
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
index 9950367972..eb0741662c 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
@@ -84,7 +85,6 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
@@ -691,11 +691,20 @@ public final class HddsTestUtils {
}
public static Set<ContainerReplica> getReplicas(
+ final ContainerID containerId,
+ final ContainerReplicaProto.State state,
+ final long sequenceId,
+ final DatanodeDetails... datanodeDetails) {
+ return Sets.newHashSet(getReplicas(containerId, state, sequenceId,
+ Arrays.asList(datanodeDetails)));
+ }
+
+ public static List<ContainerReplica> getReplicas(
final ContainerID containerId,
final ContainerReplicaProto.State state,
final long sequenceId,
- final DatanodeDetails... datanodeDetails) {
- Set<ContainerReplica> replicas = new HashSet<>();
+ final Iterable<DatanodeDetails> datanodeDetails) {
+ List<ContainerReplica> replicas = new ArrayList<>();
for (DatanodeDetails datanode : datanodeDetails) {
replicas.add(getReplicas(containerId, state,
sequenceId, datanode.getUuid(), datanode));
@@ -744,14 +753,14 @@ public final class HddsTestUtils {
return builder.build();
}
- public static Set<ContainerReplica> getReplicasWithReplicaIndex(
+ public static List<ContainerReplica> getReplicasWithReplicaIndex(
final ContainerID containerId,
final ContainerReplicaProto.State state,
final long usedBytes,
final long keyCount,
final long sequenceId,
- final DatanodeDetails... datanodeDetails) {
- Set<ContainerReplica> replicas = new HashSet<>();
+ final Iterable<DatanodeDetails> datanodeDetails) {
+ List<ContainerReplica> replicas = new ArrayList<>();
int replicaIndex = 1;
for (DatanodeDetails datanode : datanodeDetails) {
replicas.add(getReplicaBuilder(containerId, state,
@@ -762,6 +771,17 @@ public final class HddsTestUtils {
return replicas;
}
+ public static Set<ContainerReplica> getReplicasWithReplicaIndex(
+ final ContainerID containerId,
+ final ContainerReplicaProto.State state,
+ final long usedBytes,
+ final long keyCount,
+ final long sequenceId,
+ final DatanodeDetails... datanodeDetails) {
+ return Sets.newHashSet(getReplicasWithReplicaIndex(containerId, state,
+ usedBytes, keyCount, sequenceId, Arrays.asList(datanodeDetails)));
+ }
+
public static Pipeline getRandomPipeline() {
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestSCMCommonPlacementPolicy.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestSCMCommonPlacementPolicy.java
index d11ca91b36..885e474749 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestSCMCommonPlacementPolicy.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestSCMCommonPlacementPolicy.java
@@ -18,21 +18,35 @@
package org.apache.hadoop.hdds.scm;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.Node;
import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.thirdparty.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
/**
* Test functions of SCMCommonPlacementPolicy.
@@ -51,25 +65,237 @@ public class TestSCMCommonPlacementPolicy {
@Test
public void testGetResultSet() throws SCMException {
DummyPlacementPolicy dummyPlacementPolicy =
- new DummyPlacementPolicy(nodeManager, conf);
- List<DatanodeDetails> list =
- nodeManager.getNodes(NodeStatus.inServiceHealthy());
+ new DummyPlacementPolicy(nodeManager, conf, 5);
+ List<DatanodeDetails> list = nodeManager.getAllNodes();
List<DatanodeDetails> result = dummyPlacementPolicy.getResultSet(3, list);
Set<DatanodeDetails> resultSet = new HashSet<>(result);
Assertions.assertNotEquals(1, resultSet.size());
}
+ private void testReplicasToFixMisreplication(
+ List<ContainerReplica> replicas,
+ DummyPlacementPolicy placementPolicy,
+ int expectedNumberOfReplicasToCopy,
+ Map<Node, Integer> expectedNumberOfCopyOperationFromRack) {
+ Set<ContainerReplica> replicasToCopy = placementPolicy
+ .replicasToCopyToFixMisreplication(Sets.newHashSet(replicas));
+ Assertions.assertEquals(expectedNumberOfReplicasToCopy,
+ replicasToCopy.size());
+ Map<Node, Long> rackCopyMap =
+ replicasToCopy.stream().collect(Collectors.groupingBy(
+ replica -> placementPolicy
+ .getPlacementGroup(replica.getDatanodeDetails()),
+ Collectors.counting()));
+ Set<Node> racks = replicas.stream()
+ .map(ContainerReplica::getDatanodeDetails)
+ .map(placementPolicy::getPlacementGroup)
+ .collect(Collectors.toSet());
+ for (Node rack: racks) {
+ Assertions.assertEquals(
+ expectedNumberOfCopyOperationFromRack.getOrDefault(rack, 0),
+ rackCopyMap.getOrDefault(rack, 0L).intValue());
+ }
+ }
+
+ @Test
+ public void testReplicasToFixMisreplicationWithOneMisreplication() {
+ DummyPlacementPolicy dummyPlacementPolicy =
+ new DummyPlacementPolicy(nodeManager, conf, 5);
+ List<Node> racks = dummyPlacementPolicy.racks;
+ List<DatanodeDetails> list = nodeManager.getAllNodes();
+ List<DatanodeDetails> replicaDns = Stream.of(0, 1, 2, 3, 5)
+ .map(list::get).collect(Collectors.toList());
+ List<ContainerReplica> replicas =
+ HddsTestUtils.getReplicasWithReplicaIndex(new ContainerID(1),
+ CLOSED, 0, 0, 0, replicaDns);
+ testReplicasToFixMisreplication(replicas, dummyPlacementPolicy, 1,
+ ImmutableMap.of(racks.get(0), 1));
+ }
+
+ @Test
+ public void testReplicasToFixMisreplicationWithTwoMisreplication() {
+ DummyPlacementPolicy dummyPlacementPolicy = new DummyPlacementPolicy(
+ nodeManager, conf,
+ GenericTestUtils.getReverseMap(
+ ImmutableMap.of(0, ImmutableList.of(0, 1, 5),
+ 1, ImmutableList.of(6),
+ 2, ImmutableList.of(2, 7),
+ 3, ImmutableList.of(3, 8),
+ 4, ImmutableList.of(4, 9))), 5);
+ List<Node> racks = dummyPlacementPolicy.racks;
+ List<DatanodeDetails> list = nodeManager.getAllNodes();
+ List<DatanodeDetails> replicaDns = Stream.of(0, 1, 2, 3, 5)
+ .map(list::get).collect(Collectors.toList());
+ List<ContainerReplica> replicas =
+ HddsTestUtils.getReplicasWithReplicaIndex(new ContainerID(1),
+ CLOSED, 0, 0, 0, replicaDns);
+ testReplicasToFixMisreplication(replicas, dummyPlacementPolicy, 2,
+ ImmutableMap.of(racks.get(0), 2));
+ }
+
+ @Test
+ public void testReplicasToFixMisreplicationWithThreeMisreplication() {
+ DummyPlacementPolicy dummyPlacementPolicy = new DummyPlacementPolicy(
+ nodeManager, conf,
+ GenericTestUtils.getReverseMap(
+ ImmutableMap.of(0, ImmutableList.of(0, 1, 2, 5),
+ 1, ImmutableList.of(6),
+ 2, ImmutableList.of(7),
+ 3, ImmutableList.of(3, 8),
+ 4, ImmutableList.of(4, 9))), 5);
+ List<Node> racks = dummyPlacementPolicy.racks;
+ List<DatanodeDetails> list = nodeManager.getAllNodes();
+ List<DatanodeDetails> replicaDns = Stream.of(0, 1, 2, 3, 5)
+ .map(list::get).collect(Collectors.toList());
+ List<ContainerReplica> replicas =
+ HddsTestUtils.getReplicasWithReplicaIndex(new ContainerID(1),
+ CLOSED, 0, 0, 0, replicaDns);
+ testReplicasToFixMisreplication(replicas, dummyPlacementPolicy, 3,
+ ImmutableMap.of(racks.get(0), 3));
+ }
+
+ @Test
+ public void
+ testReplicasToFixMisreplicationWithThreeMisreplicationOnDifferentRack() {
+ DummyPlacementPolicy dummyPlacementPolicy = new DummyPlacementPolicy(
+ nodeManager, conf,
+ GenericTestUtils.getReverseMap(
+ ImmutableMap.of(0, ImmutableList.of(0, 1, 2, 5),
+ 1, ImmutableList.of(6),
+ 2, ImmutableList.of(7),
+ 3, ImmutableList.of(3, 4, 8),
+ 4, ImmutableList.of(9))), 5);
+ List<Node> racks = dummyPlacementPolicy.racks;
+ List<DatanodeDetails> list = nodeManager.getAllNodes();
+ List<DatanodeDetails> replicaDns = Stream.of(0, 1, 2, 3, 4)
+ .map(list::get).collect(Collectors.toList());
+ //Creating Replicas without replica Index
+ List<ContainerReplica> replicas = HddsTestUtils
+ .getReplicas(new ContainerID(1), CLOSED, 0, replicaDns);
+ testReplicasToFixMisreplication(replicas, dummyPlacementPolicy, 3,
+ ImmutableMap.of(racks.get(0), 2, racks.get(3), 1));
+ }
+
+ @Test
+ public void
+ testReplicasToFixMisreplicationWithReplicationFactorLessThanNumberOfRack(
+ ) {
+ DummyPlacementPolicy dummyPlacementPolicy = new DummyPlacementPolicy(
+ nodeManager, conf,
+ GenericTestUtils.getReverseMap(
+ ImmutableMap.of(0, ImmutableList.of(0, 1, 5),
+ 1, ImmutableList.of(6),
+ 2, ImmutableList.of(2, 7),
+ 3, ImmutableList.of(3, 4, 8),
+ 4, ImmutableList.of(9))), 5);
+ List<Node> racks = dummyPlacementPolicy.racks;
+ List<DatanodeDetails> list = nodeManager.getAllNodes();
+ List<DatanodeDetails> replicaDns = Stream.of(0, 1, 3, 4)
+ .map(list::get).collect(Collectors.toList());
+ //Creating Replicas without replica Index for replicas < number of racks
+ List<ContainerReplica> replicas = HddsTestUtils
+ .getReplicas(new ContainerID(1), CLOSED, 0, replicaDns);
+ testReplicasToFixMisreplication(replicas, dummyPlacementPolicy, 2,
+ ImmutableMap.of(racks.get(0), 1, racks.get(3), 1));
+ }
+
+ @Test
+ public void
+ testReplicasToFixMisreplicationWithReplicationFactorMoreThanNumberOfRack(
+ ) {
+ DummyPlacementPolicy dummyPlacementPolicy = new DummyPlacementPolicy(
+ nodeManager, conf,
+ GenericTestUtils.getReverseMap(
+ ImmutableMap.of(0, ImmutableList.of(0, 1, 2, 5),
+ 1, ImmutableList.of(6),
+ 2, ImmutableList.of(7),
+ 3, ImmutableList.of(3, 4, 8),
+ 4, ImmutableList.of(9))), 5);
+ List<Node> racks = dummyPlacementPolicy.racks;
+ List<DatanodeDetails> list = nodeManager.getAllNodes();
+ List<DatanodeDetails> replicaDns = Stream.of(0, 1, 2, 3, 4, 6)
+ .map(list::get).collect(Collectors.toList());
+ //Creating Replicas without replica Index for replicas >number of racks
+ List<ContainerReplica> replicas = HddsTestUtils
+ .getReplicas(new ContainerID(1), CLOSED, 0, replicaDns);
+ testReplicasToFixMisreplication(replicas, dummyPlacementPolicy, 2,
+ ImmutableMap.of(racks.get(0), 1, racks.get(3), 1));
+ }
+
+ @Test
+ public void testReplicasToFixMisreplicationMaxReplicaPerRack() {
+ DummyPlacementPolicy dummyPlacementPolicy =
+ new DummyPlacementPolicy(nodeManager, conf, 2);
+ List<Node> racks = dummyPlacementPolicy.racks;
+ List<DatanodeDetails> list = nodeManager.getAllNodes();
+ List<DatanodeDetails> replicaDns = Stream.of(0, 2, 4, 6, 8)
+ .map(list::get).collect(Collectors.toList());
+ List<ContainerReplica> replicas =
+ HddsTestUtils.getReplicasWithReplicaIndex(new ContainerID(1),
+ CLOSED, 0, 0, 0, replicaDns);
+ testReplicasToFixMisreplication(replicas, dummyPlacementPolicy, 2,
+ ImmutableMap.of(racks.get(0), 2));
+ }
+
+ @Test
+ public void testReplicasWithoutMisreplication() {
+ DummyPlacementPolicy dummyPlacementPolicy =
+ new DummyPlacementPolicy(nodeManager, conf, 5);
+ List<DatanodeDetails> list = nodeManager.getAllNodes();
+ List<DatanodeDetails> replicaDns = Stream.of(0, 1, 2, 3, 4)
+ .map(list::get).collect(Collectors.toList());
+ List<ContainerReplica> replicas =
+ HddsTestUtils.getReplicasWithReplicaIndex(new ContainerID(1),
+ CLOSED, 0, 0, 0, replicaDns);
+
+ Set<ContainerReplica> replicasToCopy = dummyPlacementPolicy
+ .replicasToCopyToFixMisreplication(Sets.newHashSet(replicas));
+ Assertions.assertEquals(0, replicasToCopy.size());
+ }
+
+
+
private static class DummyPlacementPolicy extends SCMCommonPlacementPolicy {
+ private Map<DatanodeDetails, Node> rackMap;
+ private List<Node> racks;
+ private int rackCnt;
+
+ DummyPlacementPolicy(NodeManager nodeManager, ConfigurationSource conf,
+ int rackCnt) {
+ this(nodeManager, conf,
+ IntStream.range(0, nodeManager.getAllNodes().size()).boxed()
+ .collect(Collectors.toMap(Function.identity(),
+ idx -> idx % rackCnt)), rackCnt);
+ }
- DummyPlacementPolicy(
- NodeManager nodeManager,
- ConfigurationSource conf) {
+ DummyPlacementPolicy(NodeManager nodeManager, ConfigurationSource conf,
+ Map<Integer, Integer> datanodeRackMap, int rackCnt) {
super(nodeManager, conf);
+ this.rackCnt = rackCnt;
+ this.racks = IntStream.range(0, rackCnt)
+ .mapToObj(i -> Mockito.mock(Node.class)).collect(Collectors.toList());
+ List<DatanodeDetails> datanodeDetails = nodeManager.getAllNodes();
+ rackMap = datanodeRackMap.entrySet().stream()
+ .collect(Collectors.toMap(
+ entry -> datanodeDetails.get(entry.getKey()),
+ entry -> racks.get(entry.getValue())));
}
+
+
@Override
public DatanodeDetails chooseNode(List<DatanodeDetails> healthyNodes) {
return healthyNodes.get(0);
}
+
+ @Override
+ public Node getPlacementGroup(DatanodeDetails dn) {
+ return rackMap.get(dn);
+ }
+
+ @Override
+ protected int getRequiredRackCount(int numReplicas) {
+ return Math.min(numReplicas, rackCnt);
+ }
}
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
index c35cb2b455..254a20e480 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
@@ -19,7 +19,9 @@ package
org.apache.hadoop.hdds.scm.container.placement.algorithms;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Set;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
@@ -31,6 +33,7 @@ import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
@@ -176,7 +179,8 @@ public class TestContainerPlacementFactory {
/**
* A dummy container placement implementation for test.
*/
- public static class DummyImpl implements PlacementPolicy {
+ public static class DummyImpl implements
+ PlacementPolicy<ContainerReplica> {
@Override
public List<DatanodeDetails> chooseDatanodes(
List<DatanodeDetails> usedNodes,
@@ -191,6 +195,12 @@ public class TestContainerPlacementFactory {
validateContainerPlacement(List<DatanodeDetails> dns, int replicas) {
return new ContainerPlacementStatusDefault(1, 1, 1);
}
+
+ @Override
+ public Set<ContainerReplica> replicasToCopyToFixMisreplication(
+ Set<ContainerReplica> replicas) {
+ return Collections.emptySet();
+ }
}
@Test
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java
index 46bf8031ef..b49037e583 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java
@@ -58,6 +58,7 @@ import java.util.stream.Stream;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN;
+import static
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES;
import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
@@ -245,20 +246,34 @@ public class TestSCMContainerPlacementRackScatter {
nodeNum = 5;
if (datanodeCount > nodeNum) {
assumeTrue(datanodeCount >= NODE_PER_RACK);
- datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 0, 15);
- Assertions.assertEquals(nodeNum, datanodeDetails.size());
- Assertions.assertEquals(getRackSize(datanodeDetails),
- Math.min(nodeNum, rackNum));
+ if (datanodeCount == 6) {
+ int finalNodeNum = nodeNum;
+ SCMException e = assertThrows(SCMException.class,
+ () -> policy.chooseDatanodes(null, null, finalNodeNum, 0, 15));
+ assertEquals(FAILED_TO_FIND_HEALTHY_NODES, e.getResult());
+ } else {
+ datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 0, 15);
+ Assertions.assertEquals(nodeNum, datanodeDetails.size());
+ Assertions.assertEquals(getRackSize(datanodeDetails),
+ Math.min(nodeNum, rackNum));
+ }
}
// 10 replicas
nodeNum = 10;
if (datanodeCount > nodeNum) {
assumeTrue(datanodeCount > 2 * NODE_PER_RACK);
- datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 0, 15);
- Assertions.assertEquals(nodeNum, datanodeDetails.size());
- Assertions.assertEquals(getRackSize(datanodeDetails),
- Math.min(nodeNum, rackNum));
+ if (datanodeCount == 11) {
+ int finalNodeNum = nodeNum;
+ SCMException e = assertThrows(SCMException.class,
+ () -> policy.chooseDatanodes(null, null, finalNodeNum, 0, 15));
+ assertEquals(FAILED_TO_FIND_HEALTHY_NODES, e.getResult());
+ } else {
+ datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 0, 15);
+ Assertions.assertEquals(nodeNum, datanodeDetails.size());
+ Assertions.assertEquals(getRackSize(datanodeDetails),
+ Math.min(nodeNum, rackNum));
+ }
}
}
@@ -314,11 +329,20 @@ public class TestSCMContainerPlacementRackScatter {
totalNum = 5;
excludedNodes.clear();
excludedNodes.add(datanodes.get(0));
- datanodeDetails = policy.chooseDatanodes(
- excludedNodes, null, nodeNum, 0, 15);
- Assertions.assertEquals(nodeNum, datanodeDetails.size());
- Assertions.assertEquals(getRackSize(datanodeDetails, excludedNodes),
- Math.min(totalNum, rackNum));
+ if (datanodeCount == 6) {
+ int finalNodeNum = nodeNum;
+ SCMException e = assertThrows(SCMException.class,
+ () -> policy.chooseDatanodes(excludedNodes, null,
+ finalNodeNum, 0, 15));
+ assertEquals(FAILED_TO_FIND_HEALTHY_NODES, e.getResult());
+ } else {
+ datanodeDetails = policy.chooseDatanodes(
+ excludedNodes, null, nodeNum, 0, 15);
+ Assertions.assertEquals(nodeNum, datanodeDetails.size());
+ Assertions.assertEquals(getRackSize(datanodeDetails, excludedNodes),
+ Math.min(totalNum, rackNum));
+ }
+
// 5 replicas, two existing datanodes on different rack
nodeNum = 3;
@@ -344,7 +368,9 @@ public class TestSCMContainerPlacementRackScatter {
SCMException e = assertThrows(SCMException.class,
() -> policy.chooseDatanodes(excludedNodes, null, 3, 0, 15));
String message = e.getMessage();
- assumeTrue(message.contains("ContainerPlacementPolicy not met"));
+ assertTrue(message.contains("Chosen nodes size from Unique Racks: 1," +
+ " but required nodes to choose from Unique Racks: " +
+ "2 do not match."));
} else {
datanodeDetails = policy.chooseDatanodes(
excludedNodes, null, nodeNum, 0, 15);
@@ -567,7 +593,7 @@ public class TestSCMContainerPlacementRackScatter {
assertEquals("Chosen nodes size from Unique Racks: 1, but required " +
"nodes to choose from Unique Racks: 2 do not match.",
exception.getMessage());
- assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES,
+ assertEquals(FAILED_TO_FIND_HEALTHY_NODES,
exception.getResult());
}
diff --git
a/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java
b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java
index e03f0a7ffe..771f5137a4 100644
---
a/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java
+++
b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java
@@ -25,6 +25,8 @@ import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeoutException;
import com.google.common.base.Preconditions;
@@ -32,6 +34,7 @@ import com.google.common.base.Supplier;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.log4j.Appender;
import org.apache.log4j.Layout;
import org.apache.log4j.Level;
@@ -43,6 +46,7 @@ import org.junit.Assert;
import org.mockito.Mockito;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
+import java.util.stream.Collectors;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertTrue;
@@ -283,6 +287,12 @@ public abstract class GenericTestUtils {
return value;
}
+ public static <K, V> Map<V, K> getReverseMap(Map<K, List<V>> map) {
+ return map.entrySet().stream().flatMap(entry -> entry.getValue().stream()
+ .map(v -> Pair.of(v, entry.getKey())))
+ .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+ }
+
/**
* Class to capture logs for doing assertions.
*/
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
index d60bbe9801..27de4c743a 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
@@ -343,7 +343,8 @@ public class TestContainerHealthTask extends
AbstractReconSqlDBTest {
* of a datanode via setMisRepWhenDnPresent. If a DN with that UUID is passed
* to validateContainerPlacement, then it will return an invalid placement.
*/
- private static class MockPlacementPolicy implements PlacementPolicy {
+ private static class MockPlacementPolicy implements
+ PlacementPolicy<ContainerReplica> {
private UUID misRepWhenDnPresent = null;
@@ -370,6 +371,13 @@ public class TestContainerHealthTask extends
AbstractReconSqlDBTest {
}
}
+ @Override
+ public Set<ContainerReplica> replicasToCopyToFixMisreplication(
+ Set<ContainerReplica> replicas) {
+ return Collections.emptySet();
+ }
+
+
private boolean isDnPresent(List<DatanodeDetails> dns) {
for (DatanodeDetails dn : dns) {
if (misRepWhenDnPresent != null
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]