This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 08d8521060 HDDS-7822. Allow multiple commands per datanode in
UnhealthyReplicationHandler (#4203)
08d8521060 is described below
commit 08d8521060612a9a8ed7ae9aa4dabbd387cc4920
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Wed Jan 25 10:20:31 2023 +0100
HDDS-7822. Allow multiple commands per datanode in
UnhealthyReplicationHandler (#4203)
---
.../AbstractOverReplicationHandler.java | 20 --------
.../replication/ECOverReplicationHandler.java | 18 ++++---
.../replication/ECUnderReplicationHandler.java | 24 ++++-----
.../replication/MisReplicationHandler.java | 18 +++----
.../replication/OverReplicatedProcessor.java | 5 +-
.../replication/RatisOverReplicationHandler.java | 20 ++++----
.../replication/RatisUnderReplicationHandler.java | 28 +++++------
.../container/replication/ReplicationManager.java | 5 +-
.../replication/UnderReplicatedProcessor.java | 5 +-
.../replication/UnhealthyReplicationHandler.java | 5 +-
.../replication/UnhealthyReplicationProcessor.java | 13 ++---
.../replication/TestECOverReplicationHandler.java | 17 +++----
.../replication/TestECUnderReplicationHandler.java | 58 +++++++++++-----------
.../replication/TestMisReplicationHandler.java | 14 +++---
.../replication/TestOverReplicatedProcessor.java | 9 ++--
.../TestRatisOverReplicationHandler.java | 25 ++++++----
.../TestRatisUnderReplicationHandler.java | 3 +-
.../replication/TestUnderReplicatedProcessor.java | 15 +++---
18 files changed, 146 insertions(+), 156 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AbstractOverReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AbstractOverReplicationHandler.java
index 5d647593ef..9dead0feab 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AbstractOverReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AbstractOverReplicationHandler.java
@@ -22,11 +22,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import java.io.IOException;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -42,23 +39,6 @@ public abstract class AbstractOverReplicationHandler
this.placementPolicy = placementPolicy;
}
- /**
- * Identify a new set of datanode(s) to delete the container
- * and form the SCM commands to send it to DN.
- *
- * @param replicas - Set of available container replicas.
- * @param pendingOps - Inflight replications and deletion ops.
- * @param result - Health check result.
- * @param remainingMaintenanceRedundancy - represents that how many nodes go
- * into maintenance.
- * @return Returns the key value pair of destination dn where the command
gets
- * executed and the command itself.
- */
- public abstract Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
- Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
- ContainerHealthResult result, int remainingMaintenanceRedundancy) throws
- IOException;
-
/**
* Identify whether the placement status is actually equal for a
* replica set after removing those filtered replicas.
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
index 00c1a264af..4bd49144d6 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.scm.container.replication;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
@@ -32,12 +33,13 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
/**
* Handles the EC Over replication processing and forming the respective SCM
@@ -69,7 +71,7 @@ public class ECOverReplicationHandler extends
AbstractOverReplicationHandler {
* executed and the command itself.
*/
@Override
- public Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
+ public Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
ContainerHealthResult result, int remainingMaintenanceRedundancy) {
ContainerInfo container = result.getContainerInfo();
@@ -102,13 +104,13 @@ public class ECOverReplicationHandler extends
AbstractOverReplicationHandler {
LOG.info("The container {} state changed and it is no longer over"
+ " replication. Replica count: {}, healthy replica count: {}",
container.getContainerID(), replicas.size(), healthyReplicas.size());
- return emptyMap();
+ return emptySet();
}
if (!replicaCount.isOverReplicated(true)) {
LOG.info("The container {} with replicas {} will be corrected " +
"by the pending delete", container.getContainerID(), replicas);
- return emptyMap();
+ return emptySet();
}
List<Integer> overReplicatedIndexes =
@@ -118,7 +120,7 @@ public class ECOverReplicationHandler extends
AbstractOverReplicationHandler {
LOG.warn("The container {} with replicas {} was found over replicated "
+ "by EcContainerReplicaCount, but there are no over replicated "
+ "indexes returned", container.getContainerID(), replicas);
- return emptyMap();
+ return emptySet();
}
final List<DatanodeDetails> deletionInFlight = new ArrayList<>();
@@ -141,10 +143,10 @@ public class ECOverReplicationHandler extends
AbstractOverReplicationHandler {
LOG.warn("The container {} is over replicated, but no replicas were "
+ "selected to remove by the placement policy. Replicas: {}",
container, replicas);
- return emptyMap();
+ return emptySet();
}
- final Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
+ final Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
// As a sanity check, sum up the current counts of each replica index. When
// processing replicasToRemove, ensure that removing the replica would not
// drop the count of that index to zero.
@@ -165,7 +167,7 @@ public class ECOverReplicationHandler extends
AbstractOverReplicationHandler {
DeleteContainerCommand deleteCommand =
new DeleteContainerCommand(container.getContainerID(), true);
deleteCommand.setReplicaIndex(r.getReplicaIndex());
- commands.put(r.getDatanodeDetails(), deleteCommand);
+ commands.add(Pair.of(r.getDatanodeDetails(), deleteCommand));
}
if (commands.size() == 0) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index 49b29bd46c..256ec44624 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -41,14 +41,14 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
/**
@@ -114,7 +114,7 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
* should be retried later.
*/
@Override
- public Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
+ public Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
final Set<ContainerReplica> replicas,
final List<ContainerReplicaOp> pendingOps,
final ContainerHealthResult result,
@@ -128,13 +128,13 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
if (replicaCount.isSufficientlyReplicated()) {
LOG.info("The container {} state changed and it's not in under"
+ " replication any more.", container.getContainerID());
- return emptyMap();
+ return emptySet();
}
if (replicaCount.isSufficientlyReplicated(true)) {
LOG.info("The container {} with replicas {} will be sufficiently " +
"replicated after pending replicas are created",
container.getContainerID(), replicaCount.getReplicas());
- return emptyMap();
+ return emptySet();
}
// don't place reconstructed replicas on exclude nodes, since they already
@@ -151,7 +151,7 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
.collect(Collectors.toList()));
final ContainerID id = container.containerID();
- final Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
+ final Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
try {
final List<DatanodeDetails> deletionInFlight = new ArrayList<>();
for (ContainerReplicaOp op : pendingOps) {
@@ -282,7 +282,7 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
Pair<ContainerReplica, NodeStatus>> sources,
List<DatanodeDetails> availableSourceNodes,
List<DatanodeDetails> excludedNodes,
- Map<DatanodeDetails, SCMCommand<?>> commands) throws IOException {
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commands) throws IOException {
ContainerInfo container = replicaCount.getContainer();
ECReplicationConfig repConfig =
(ECReplicationConfig)container.getReplicationConfig();
@@ -316,7 +316,7 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
int2byte(missingIndexes),
repConfig);
// Keeping the first target node as coordinator.
- commands.put(selectedDatanodes.get(0), reconstructionCommand);
+ commands.add(Pair.of(selectedDatanodes.get(0), reconstructionCommand));
}
} else {
LOG.warn("Cannot proceed for EC container reconstruction for {}, due"
@@ -337,7 +337,7 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
Set<ContainerReplica> replicas,
List<DatanodeDetails> availableSourceNodes,
List<DatanodeDetails> excludedNodes,
- Map<DatanodeDetails, SCMCommand<?>> commands) throws IOException {
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commands) throws IOException {
ContainerInfo container = replicaCount.getContainer();
Set<Integer> decomIndexes = replicaCount.decommissioningOnlyIndexes(true);
if (decomIndexes.size() > 0) {
@@ -376,7 +376,7 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
private void processMaintenanceOnlyIndexes(
ECContainerReplicaCount replicaCount, Set<ContainerReplica> replicas,
List<DatanodeDetails> excludedNodes,
- Map<DatanodeDetails, SCMCommand<?>> commands) throws IOException {
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commands) throws IOException {
Set<Integer> maintIndexes = replicaCount.maintenanceOnlyIndexes(true);
if (maintIndexes.isEmpty()) {
return;
@@ -412,7 +412,7 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
}
private void createReplicateCommand(
- Map<DatanodeDetails, SCMCommand<?>> commands,
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commands,
ContainerInfo container, Iterator<DatanodeDetails> iterator,
ContainerReplica replica) {
final boolean push = replicationManager.getConfig().isPush();
@@ -426,7 +426,7 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
// For EC containers, we need to track the replica index which is
// to be replicated, so add it to the command.
replicateCommand.setReplicaIndex(replica.getReplicaIndex());
- commands.put(push ? source : target, replicateCommand);
+ commands.add(Pair.of(push ? source : target, replicateCommand));
}
private static byte[] int2byte(List<Integer> src) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
index 3570f45eb7..10dfede065 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
@@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdds.scm.container.replication;
-import com.google.common.collect.Maps;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -36,8 +36,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -114,11 +114,11 @@ public abstract class MisReplicationHandler implements
protected abstract ReplicateContainerCommand updateReplicateCommand(
ReplicateContainerCommand command, ContainerReplica replica);
- private Map<DatanodeDetails, SCMCommand<?>> getReplicateCommands(
+ private Set<Pair<DatanodeDetails, SCMCommand<?>>> getReplicateCommands(
ContainerInfo containerInfo,
Set<ContainerReplica> replicasToBeReplicated,
List<DatanodeDetails> targetDns) {
- Map<DatanodeDetails, SCMCommand<?>> commandMap = Maps.newHashMap();
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commandMap = new HashSet<>();
int datanodeIdx = 0;
for (ContainerReplica replica : replicasToBeReplicated) {
if (datanodeIdx == targetDns.size()) {
@@ -132,14 +132,14 @@ public abstract class MisReplicationHandler implements
: ReplicateContainerCommand.fromSources(containerID,
Collections.singletonList(source));
replicateCommand = updateReplicateCommand(replicateCommand, replica);
- commandMap.put(push ? source : target, replicateCommand);
+ commandMap.add(Pair.of(push ? source : target, replicateCommand));
datanodeIdx += 1;
}
return commandMap;
}
@Override
- public Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
+ public Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
ContainerHealthResult result, int remainingMaintenanceRedundancy)
throws IOException {
@@ -148,7 +148,7 @@ public abstract class MisReplicationHandler implements
LOG.info("Skipping Mis-Replication for Container {}, " +
"as there are still some pending ops for the container: {}",
container, pendingOps);
- return Collections.emptyMap();
+ return Collections.emptySet();
}
ContainerReplicaCount replicaCount = getContainerReplicaCount(container,
replicas, Collections.emptyList(), remainingMaintenanceRedundancy);
@@ -162,7 +162,7 @@ public abstract class MisReplicationHandler implements
container.getContainerID(),
!replicaCount.isSufficientlyReplicated(),
replicaCount.isOverReplicated());
- return Collections.emptyMap();
+ return Collections.emptySet();
}
List<DatanodeDetails> usedDns = replicas.stream()
@@ -172,7 +172,7 @@ public abstract class MisReplicationHandler implements
usedDns.size()).isPolicySatisfied()) {
LOG.info("Container {} is currently not misreplicated",
container.getContainerID());
- return Collections.emptyMap();
+ return Collections.emptySet();
}
Set<ContainerReplica> sources = filterSources(replicas);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java
index 93f536a644..4f7806487c 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java
@@ -17,11 +17,12 @@
*/
package org.apache.hadoop.hdds.scm.container.replication;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import java.io.IOException;
-import java.util.Map;
+import java.util.Set;
/**
* Class used to pick messages from the ReplicationManager over replicated
@@ -50,7 +51,7 @@ public class OverReplicatedProcessor extends
UnhealthyReplicationProcessor
replicationManager.requeueOverReplicatedContainer(healthResult);
}
@Override
- protected Map<DatanodeDetails, SCMCommand<?>> getDatanodeCommands(
+ protected Set<Pair<DatanodeDetails, SCMCommand<?>>> getDatanodeCommands(
ReplicationManager replicationManager,
ContainerHealthResult.OverReplicatedHealthResult healthResult)
throws IOException {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
index d3bef7d9e2..856027a5af 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.container.replication;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
@@ -34,7 +35,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -77,7 +77,7 @@ public class RatisOverReplicationHandler
* delete replicas on those datanodes.
*/
@Override
- public Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
+ public Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
ContainerHealthResult result, int minHealthyForMaintenance) throws
IOException {
@@ -115,7 +115,7 @@ public class RatisOverReplicationHandler
// verify that this container is actually over replicated
if (!verifyOverReplication(replicaCount)) {
- return Collections.emptyMap();
+ return Collections.emptySet();
}
// get number of excess replicas
@@ -132,7 +132,7 @@ public class RatisOverReplicationHandler
if (eligibleReplicas.size() == 0) {
LOG.info("Did not find any replicas that are eligible to be deleted for"
+
" container {}.", containerInfo);
- return Collections.emptyMap();
+ return Collections.emptySet();
}
return createCommands(containerInfo, eligibleReplicas, excess);
@@ -214,10 +214,10 @@ public class RatisOverReplicationHandler
.collect(Collectors.toList());
}
- private Map<DatanodeDetails, SCMCommand<?>> createCommands(
+ private Set<Pair<DatanodeDetails, SCMCommand<?>>> createCommands(
ContainerInfo containerInfo, List<ContainerReplica> replicas,
int excess) {
- Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
/*
Over replication means we have enough healthy replicas, so unhealthy
@@ -231,8 +231,8 @@ public class RatisOverReplicationHandler
}
if (!ReplicationManager.compareState(
containerInfo.getState(), replica.getState())) {
- commands.put(replica.getDatanodeDetails(),
- createDeleteCommand(containerInfo));
+ commands.add(Pair.of(replica.getDatanodeDetails(),
+ createDeleteCommand(containerInfo)));
unhealthyReplicas.add(replica);
excess--;
}
@@ -253,8 +253,8 @@ public class RatisOverReplicationHandler
if (super.isPlacementStatusActuallyEqualAfterRemove(replicaSet, replica,
containerInfo.getReplicationFactor().getNumber())) {
- commands.put(replica.getDatanodeDetails(),
- createDeleteCommand(containerInfo));
+ commands.add(Pair.of(replica.getDatanodeDetails(),
+ createDeleteCommand(containerInfo)));
excess--;
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
index d9d96a566c..6d5a2d266b 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hdds.scm.container.replication;
+import org.apache.commons.collections.iterators.LoopingIterator;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -34,11 +36,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -77,7 +77,6 @@ public class RatisUnderReplicationHandler
* @param result Health check result indicating under replication.
* @param minHealthyForMaintenance Number of healthy replicas that must be
* available for a DN to enter maintenance
- *
* @return Returns the key value pair of destination dn where the command
gets
* executed and the command itself. If an empty map is returned, it indicates
* the container is no longer unhealthy and can be removed from the unhealthy
@@ -85,7 +84,7 @@ public class RatisUnderReplicationHandler
* should be retried later.
*/
@Override
- public Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
+ public Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
ContainerHealthResult result, int minHealthyForMaintenance)
throws IOException {
@@ -109,7 +108,7 @@ public class RatisUnderReplicationHandler
// verify that this container is still under replicated and we don't have
// sufficient replication after considering pending adds
if (!verifyUnderReplication(replicaCount)) {
- return Collections.emptyMap();
+ return Collections.emptySet();
}
// find sources that can provide replicas
@@ -118,7 +117,7 @@ public class RatisUnderReplicationHandler
if (sourceDatanodes.isEmpty()) {
LOG.warn("Cannot replicate container {} because no healthy replicas " +
"were found.", containerInfo);
- return Collections.emptyMap();
+ return Collections.emptySet();
}
// find targets to send replicas to
@@ -127,7 +126,7 @@ public class RatisUnderReplicationHandler
if (targetDatanodes.isEmpty()) {
LOG.warn("Cannot replicate container {} because no eligible targets " +
"were found.", containerInfo);
- return Collections.emptyMap();
+ return Collections.emptySet();
}
return createReplicationCommands(containerInfo.getContainerID(),
@@ -231,31 +230,28 @@ public class RatisUnderReplicationHandler
replicaCount.additionalReplicaNeeded(), 0, dataSizeRequired);
}
- private Map<DatanodeDetails, SCMCommand<?>> createReplicationCommands(
+ private Set<Pair<DatanodeDetails, SCMCommand<?>>> createReplicationCommands(
long containerID, List<DatanodeDetails> sources,
List<DatanodeDetails> targets) {
- final boolean push = replicationManager.getConfig().isPush()
- && targets.size() <= sources.size();
- // TODO if we need multiple commands per source datanode, we need a small
- // interface change
- Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
+ final boolean push = replicationManager.getConfig().isPush();
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
if (push) {
Collections.shuffle(sources);
- for (Iterator<DatanodeDetails> srcIter = sources.iterator(),
+ for (Iterator<DatanodeDetails> srcIter = new LoopingIterator(sources),
targetIter = targets.iterator();
srcIter.hasNext() && targetIter.hasNext();) {
DatanodeDetails source = srcIter.next();
DatanodeDetails target = targetIter.next();
ReplicateContainerCommand command =
ReplicateContainerCommand.toTarget(containerID, target);
- commands.put(source, command);
+ commands.add(Pair.of(source, command));
}
} else {
for (DatanodeDetails target : targets) {
ReplicateContainerCommand command =
ReplicateContainerCommand.fromSources(containerID, sources);
- commands.put(target, command);
+ commands.add(Pair.of(target, command));
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 911058f250..eee68e8623 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.scm.container.replication;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
@@ -522,7 +523,7 @@ public class ReplicationManager implements SCMService {
}
}
- public Map<DatanodeDetails, SCMCommand<?>> processUnderReplicatedContainer(
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> processUnderReplicatedContainer(
final ContainerHealthResult result) throws IOException {
ContainerID containerID = result.getContainerInfo().containerID();
Set<ContainerReplica> replicas = containerManager.getContainerReplicas(
@@ -547,7 +548,7 @@ public class ReplicationManager implements SCMService {
pendingOps, result, ratisMaintenanceMinReplicas);
}
- public Map<DatanodeDetails, SCMCommand<?>> processOverReplicatedContainer(
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> processOverReplicatedContainer(
final ContainerHealthResult result) throws IOException {
ContainerID containerID = result.getContainerInfo().containerID();
Set<ContainerReplica> replicas = containerManager.getContainerReplicas(
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java
index 429c0e14eb..10d3d2d1ef 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java
@@ -17,11 +17,12 @@
*/
package org.apache.hadoop.hdds.scm.container.replication;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import java.io.IOException;
-import java.util.Map;
+import java.util.Set;
/**
* Class used to pick messages from the ReplicationManager under replicated
@@ -50,7 +51,7 @@ public class UnderReplicatedProcessor extends
UnhealthyReplicationProcessor
}
@Override
- protected Map<DatanodeDetails, SCMCommand<?>> getDatanodeCommands(
+ protected Set<Pair<DatanodeDetails, SCMCommand<?>>> getDatanodeCommands(
ReplicationManager replicationManager,
ContainerHealthResult.UnderReplicatedHealthResult healthResult)
throws IOException {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationHandler.java
index a6d37c40f6..fe73756e39 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationHandler.java
@@ -17,13 +17,13 @@
*/
package org.apache.hadoop.hdds.scm.container.replication;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import java.io.IOException;
import java.util.List;
-import java.util.Map;
import java.util.Set;
/**
@@ -47,8 +47,9 @@ public interface UnhealthyReplicationHandler {
* queue. Any exception indicates that the container is still unhealthy and
* should be retried later.
*/
- Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
ContainerHealthResult result, int remainingMaintenanceRedundancy)
throws IOException;
+
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
index 9623222f05..1289981c51 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.container.replication;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
@@ -27,6 +28,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
+import java.util.Set;
/**
* Class used to pick messages from the respective ReplicationManager
@@ -107,17 +109,16 @@ public abstract class
UnhealthyReplicationProcessor<HealthResult extends
/**
* Gets the commands to be run datanode to process the
* container health result.
- * @param rm
- * @param healthResult
* @return Commands to be run on Datanodes
*/
- protected abstract Map<DatanodeDetails, SCMCommand<?>> getDatanodeCommands(
- ReplicationManager rm, HealthResult healthResult)
+ protected abstract Set<Pair<DatanodeDetails, SCMCommand<?>>>
+ getDatanodeCommands(ReplicationManager rm, HealthResult healthResult)
throws IOException;
+
private void processContainer(HealthResult healthResult) throws IOException {
- Map<DatanodeDetails, SCMCommand<?>> cmds = getDatanodeCommands(
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds = getDatanodeCommands(
replicationManager, healthResult);
- for (Map.Entry<DatanodeDetails, SCMCommand<?>> cmd : cmds.entrySet()) {
+ for (Map.Entry<DatanodeDetails, SCMCommand<?>> cmd : cmds) {
replicationManager.sendDatanodeCommand(cmd.getValue(),
healthResult.getContainerInfo(), cmd.getKey());
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
index e882374a09..4542dd930a 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
@@ -237,14 +237,13 @@ public class TestECOverReplicationHandler {
ECOverReplicationHandler ecORH =
new ECOverReplicationHandler(policy, nodeManager);
- Map<DatanodeDetails, SCMCommand<?>> commands = ecORH
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = ecORH
.processAndCreateCommands(availableReplicas, ImmutableList.of(),
health, 1);
Assert.assertEquals(1, commands.size());
- for (SCMCommand<?> cmd : commands.values()) {
- Assert.assertEquals(1, ((DeleteContainerCommand)cmd).getReplicaIndex());
- }
+ SCMCommand<?> cmd = commands.iterator().next().getValue();
+ Assert.assertEquals(1, ((DeleteContainerCommand)cmd).getReplicaIndex());
}
private void testOverReplicationWithIndexes(
@@ -257,7 +256,7 @@ public class TestECOverReplicationHandler {
Mockito.mock(ContainerHealthResult.OverReplicatedHealthResult.class);
Mockito.when(result.getContainerInfo()).thenReturn(container);
- Map<DatanodeDetails, SCMCommand<?>> commands = ecORH
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = ecORH
.processAndCreateCommands(availableReplicas, pendingOps,
result, 1);
@@ -268,8 +267,8 @@ public class TestECOverReplicationHandler {
Assert.assertEquals(totalDeleteCommandNum, commands.size());
// Each command should have a non-zero replica index
- commands.forEach((datanode, command) -> Assert.assertNotEquals(0,
- ((DeleteContainerCommand)command).getReplicaIndex()));
+ commands.forEach(pair -> Assert.assertNotEquals(0,
+ ((DeleteContainerCommand) pair.getValue()).getReplicaIndex()));
// command num of each index should be equal to the excess num
// of this index
@@ -278,8 +277,8 @@ public class TestECOverReplicationHandler {
ContainerReplica::getDatanodeDetails,
ContainerReplica::getReplicaIndex));
Map<Integer, Integer> index2commandNum = new HashMap<>();
- commands.keySet().forEach(dd ->
- index2commandNum.merge(datanodeDetails2Index.get(dd), 1, Integer::sum)
+ commands.forEach(pair -> index2commandNum.merge(
+ datanodeDetails2Index.get(pair.getKey()), 1, Integer::sum)
);
index2commandNum.keySet().forEach(i -> {
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index 28961646e6..152f4927a7 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -51,7 +51,6 @@ import org.mockito.Mockito;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -155,13 +154,13 @@ public class TestECUnderReplicationHandler {
.createReplicas(Pair.of(DECOMMISSIONING, 1), Pair.of(IN_SERVICE, 2),
Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
Pair.of(IN_SERVICE, 5));
- Map<DatanodeDetails, SCMCommand<?>> cmds =
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds =
testUnderReplicationWithMissingIndexes(
Lists.emptyList(), availableReplicas, 1, 0, policy);
Assertions.assertEquals(1, cmds.size());
// Check the replicate command has index 1 set
- ReplicateContainerCommand cmd = (ReplicateContainerCommand) cmds.values()
- .iterator().next();
+ ReplicateContainerCommand cmd = (ReplicateContainerCommand) cmds
+ .iterator().next().getValue();
Assertions.assertEquals(1, cmd.getReplicaIndex());
}
@@ -317,13 +316,14 @@ public class TestECUnderReplicationHandler {
// returns, which in this case is a delete command for replica index 4.
availableReplicas.add(overRepReplica);
- Map<DatanodeDetails, SCMCommand<?>> expectedDelete = new HashMap<>();
- expectedDelete.put(overRepReplica.getDatanodeDetails(),
- createDeleteContainerCommand(container, overRepReplica));
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> expectedDelete =
+ new HashSet<>();
+ expectedDelete.add(Pair.of(overRepReplica.getDatanodeDetails(),
+ createDeleteContainerCommand(container, overRepReplica)));
Mockito.when(replicationManager.processOverReplicatedContainer(
underRep)).thenReturn(expectedDelete);
- Map<DatanodeDetails, SCMCommand<?>> commands =
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
ecURH.processAndCreateCommands(availableReplicas,
Collections.emptyList(), underRep, 2);
Mockito.verify(replicationManager, times(1))
@@ -374,16 +374,18 @@ public class TestECUnderReplicationHandler {
availableReplicas.add(toAdd);
}
- Map<DatanodeDetails, SCMCommand<?>> commands =
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
ecURH.processAndCreateCommands(availableReplicas,
Collections.emptyList(), underRep, 2);
Mockito.verify(replicationManager, times(0))
.processOverReplicatedContainer(underRep);
Assertions.assertEquals(1, commands.size());
+ Pair<DatanodeDetails, SCMCommand<?>> pair = commands.iterator().next();
+ Assertions.assertEquals(newNode, pair.getKey());
Assertions.assertEquals(StorageContainerDatanodeProtocolProtos
.SCMCommandProto.Type.reconstructECContainersCommand,
- commands.get(newNode).getType());
+ pair.getValue().getType());
}
}
@@ -420,13 +422,13 @@ public class TestECUnderReplicationHandler {
4, IN_SERVICE, CLOSED);
availableReplicas.add(overRepReplica);
- Map<DatanodeDetails, SCMCommand<?>> expectedDelete = new HashMap<>();
- expectedDelete.put(overRepReplica.getDatanodeDetails(),
- createDeleteContainerCommand(container, overRepReplica));
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> expectedDelete = new HashSet<>();
+ expectedDelete.add(Pair.of(overRepReplica.getDatanodeDetails(),
+ createDeleteContainerCommand(container, overRepReplica)));
Mockito.when(replicationManager.processOverReplicatedContainer(
underRep)).thenReturn(expectedDelete);
- Map<DatanodeDetails, SCMCommand<?>> commands =
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
ecURH.processAndCreateCommands(availableReplicas,
Collections.emptyList(), underRep, 1);
Mockito.verify(replicationManager, times(1))
@@ -469,12 +471,12 @@ public class TestECUnderReplicationHandler {
.createReplicas(Pair.of(DECOMMISSIONING, 1), Pair.of(IN_SERVICE, 1),
Pair.of(IN_MAINTENANCE, 1), Pair.of(IN_MAINTENANCE, 1),
Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5));
- Map<DatanodeDetails, SCMCommand<?>> cmds =
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds =
testUnderReplicationWithMissingIndexes(ImmutableList.of(2, 3),
availableReplicas, 0, 0, policy);
Assertions.assertEquals(1, cmds.size());
ReconstructECContainersCommand cmd =
- (ReconstructECContainersCommand) cmds.values().iterator().next();
+ (ReconstructECContainersCommand) cmds.iterator().next().getValue();
// Ensure that all source nodes are IN_SERVICE, we should not have picked
// the non in-service nodes for index 1.
for (ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex s
@@ -513,7 +515,7 @@ public class TestECUnderReplicationHandler {
ECUnderReplicationHandler handler = new ECUnderReplicationHandler(
ecPlacementPolicy, conf, nodeManager, replicationManager);
- Map<DatanodeDetails, SCMCommand<?>> commands =
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
handler.processAndCreateCommands(availableReplicas,
Collections.emptyList(), result, 1);
Assertions.assertEquals(1, commands.size());
@@ -562,11 +564,11 @@ public class TestECUnderReplicationHandler {
ECUnderReplicationHandler handler = new ECUnderReplicationHandler(
ecPlacementPolicy, conf, nodeManager, replicationManager);
- Map<DatanodeDetails, SCMCommand<?>> commands =
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
handler.processAndCreateCommands(availableReplicas, pendingOps, result,
1);
Assertions.assertEquals(1, commands.size());
- Assertions.assertFalse(commands.containsKey(dn));
+ Assertions.assertNotEquals(dn, commands.iterator().next().getKey());
}
@Test
@@ -577,12 +579,12 @@ public class TestECUnderReplicationHandler {
ContainerReplica decomReplica = ReplicationTestUtil.createContainerReplica(
container.containerID(), 2, DECOMMISSIONING, CLOSED);
availableReplicas.add(decomReplica);
- Map<DatanodeDetails, SCMCommand<?>> cmds =
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds =
testUnderReplicationWithMissingIndexes(Collections.emptyList(),
availableReplicas, 1, 0, policy);
Assertions.assertEquals(1, cmds.size());
ReplicateContainerCommand cmd =
- (ReplicateContainerCommand) cmds.values().iterator().next();
+ (ReplicateContainerCommand) cmds.iterator().next().getValue();
List<DatanodeDetails> sources = cmd.getSourceDatanodes();
Assertions.assertEquals(1, sources.size());
@@ -599,7 +601,7 @@ public class TestECUnderReplicationHandler {
container.containerID(), 2, ENTERING_MAINTENANCE, CLOSED);
availableReplicas.add(maintReplica);
- Map<DatanodeDetails, SCMCommand<?>> cmds =
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds =
testUnderReplicationWithMissingIndexes(Collections.emptyList(),
availableReplicas, 0, 1, policy);
Assertions.assertEquals(0, cmds.size());
@@ -611,7 +613,7 @@ public class TestECUnderReplicationHandler {
Assertions.assertEquals(1, cmds.size());
ReplicateContainerCommand cmd =
- (ReplicateContainerCommand) cmds.values().iterator().next();
+ (ReplicateContainerCommand) cmds.iterator().next().getValue();
List<DatanodeDetails> sources = cmd.getSourceDatanodes();
Assertions.assertEquals(1, sources.size());
@@ -619,7 +621,7 @@ public class TestECUnderReplicationHandler {
cmd.getSourceDatanodes().get(0));
}
- public Map<DatanodeDetails, SCMCommand<?>>
+ public Set<Pair<DatanodeDetails, SCMCommand<?>>>
testUnderReplicationWithMissingIndexes(
List<Integer> missingIndexes, Set<ContainerReplica> availableReplicas,
int decomIndexes, int maintenanceIndexes,
@@ -632,11 +634,9 @@ public class TestECUnderReplicationHandler {
Mockito.when(result.isUnrecoverable()).thenReturn(false);
Mockito.when(result.getContainerInfo()).thenReturn(container);
- Map<DatanodeDetails, SCMCommand<?>> datanodeDetailsSCMCommandMap = ecURH
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = ecURH
.processAndCreateCommands(availableReplicas, ImmutableList.of(),
result, remainingMaintenanceRedundancy);
- Set<Map.Entry<DatanodeDetails, SCMCommand<?>>> entries =
- datanodeDetailsSCMCommandMap.entrySet();
int replicateCommand = 0;
int reconstructCommand = 0;
byte[] missingIndexesByteArr = new byte[missingIndexes.size()];
@@ -646,7 +646,7 @@ public class TestECUnderReplicationHandler {
boolean shouldReconstructCommandExist =
missingIndexes.size() > 0 && missingIndexes.size() <= repConfig
.getParity();
- for (Map.Entry<DatanodeDetails, SCMCommand<?>> dnCommand : entries) {
+ for (Map.Entry<DatanodeDetails, SCMCommand<?>> dnCommand : commands) {
if (dnCommand.getValue() instanceof ReplicateContainerCommand) {
replicateCommand++;
} else if (dnCommand
@@ -666,7 +666,7 @@ public class TestECUnderReplicationHandler {
replicateCommand);
Assertions.assertEquals(shouldReconstructCommandExist ? 1 : 0,
reconstructCommand);
- return datanodeDetailsSCMCommandMap;
+ return commands;
}
private DeleteContainerCommand createDeleteContainerCommand(
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
index 2059670417..9afa9e1398 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.container.replication;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -40,6 +41,7 @@ import org.junit.jupiter.api.Assertions;
import org.mockito.Mockito;
import java.io.IOException;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -158,14 +160,14 @@ public abstract class TestMisReplicationHandler {
Map<DatanodeDetails, Integer> copyReplicaIdxMap = copy.stream()
.collect(Collectors.toMap(ContainerReplica::getDatanodeDetails,
ContainerReplica::getReplicaIndex));
- Map<DatanodeDetails, SCMCommand<?>> datanodeDetailsSCMCommandMap =
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
misReplicationHandler.processAndCreateCommands(availableReplicas,
pendingOp, result, maintenanceCnt);
- Assertions.assertEquals(expectedNumberOfNodes,
- datanodeDetailsSCMCommandMap.size());
- Assertions.assertTrue(datanodeDetailsSCMCommandMap.keySet()
- .containsAll(targetNodes));
- for (SCMCommand<?> command : datanodeDetailsSCMCommandMap.values()) {
+ Assertions.assertEquals(expectedNumberOfNodes, commands.size());
+ Assertions.assertEquals(new HashSet<>(targetNodes),
+ commands.stream().map(Pair::getKey).collect(Collectors.toSet()));
+ for (Pair<DatanodeDetails, SCMCommand<?>> pair : commands) {
+ SCMCommand<?> command = pair.getValue();
Assertions.assertTrue(command.getType() == replicateContainerCommand);
ReplicateContainerCommand replicateContainerCommand =
(ReplicateContainerCommand) command;
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestOverReplicatedProcessor.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestOverReplicatedProcessor.java
index cbaa9a8afe..910ba75f98 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestOverReplicatedProcessor.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestOverReplicatedProcessor.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.scm.container.replication;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -32,8 +33,8 @@ import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
import static org.mockito.ArgumentMatchers.any;
@@ -67,11 +68,11 @@ public class TestOverReplicatedProcessor {
.thenReturn(
new ContainerHealthResult.OverReplicatedHealthResult(container, 3,
false), null);
- Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
DeleteContainerCommand cmd =
new DeleteContainerCommand(container.getContainerID());
cmd.setReplicaIndex(5);
- commands.put(MockDatanodeDetails.randomDatanodeDetails(), cmd);
+ commands.add(Pair.of(MockDatanodeDetails.randomDatanodeDetails(), cmd));
Mockito
.when(replicationManager.processOverReplicatedContainer(any()))
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
index 5b0b762307..4e1825492e 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.scm.container.replication;
import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
@@ -42,8 +43,8 @@ import org.slf4j.event.Level;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainer;
import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerReplica;
@@ -195,10 +196,12 @@ public class TestRatisOverReplicationHandler {
Mockito.argThat(list -> list.size() <= 4), Mockito.anyInt()))
.thenReturn(new ContainerPlacementStatusDefault(1, 2, 3));
- Map<DatanodeDetails, SCMCommand<?>> commands = testProcessing(replicas,
- Collections.emptyList(), getOverReplicatedHealthResult(), 2);
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = testProcessing(
+ replicas, Collections.emptyList(), getOverReplicatedHealthResult(), 2);
+ Set<DatanodeDetails> datanodes =
+ commands.stream().map(Pair::getKey).collect(Collectors.toSet());
Assert.assertTrue(
- commands.containsKey(quasiClosedReplica.getDatanodeDetails()));
+ datanodes.contains(quasiClosedReplica.getDatanodeDetails()));
}
@Test
@@ -217,12 +220,14 @@ public class TestRatisOverReplicationHandler {
replicas.add(decommissioningReplica);
replicas.add(maintenanceReplica);
- Map<DatanodeDetails, SCMCommand<?>> commands = testProcessing(replicas,
- Collections.emptyList(), getOverReplicatedHealthResult(), 1);
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = testProcessing(
+ replicas, Collections.emptyList(), getOverReplicatedHealthResult(), 1);
+ Set<DatanodeDetails> datanodes =
+ commands.stream().map(Pair::getKey).collect(Collectors.toSet());
Assert.assertFalse(
- commands.containsKey(decommissioningReplica.getDatanodeDetails()));
+ datanodes.contains(decommissioningReplica.getDatanodeDetails()));
Assert.assertFalse(
- commands.containsKey(maintenanceReplica.getDatanodeDetails()));
+ datanodes.contains(maintenanceReplica.getDatanodeDetails()));
}
@Test
@@ -255,14 +260,14 @@ public class TestRatisOverReplicationHandler {
* the handler
* @return map of commands
*/
- private Map<DatanodeDetails, SCMCommand<?>> testProcessing(
+ private Set<Pair<DatanodeDetails, SCMCommand<?>>> testProcessing(
Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
ContainerHealthResult healthResult,
int expectNumCommands) throws IOException {
RatisOverReplicationHandler handler =
new RatisOverReplicationHandler(policy, nodeManager);
- Map<DatanodeDetails, SCMCommand<?>> commands =
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
handler.processAndCreateCommands(replicas, pendingOps,
healthResult, 2);
Assert.assertEquals(expectNumCommands, commands.size());
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
index efed912e74..29e035f4de 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
@@ -44,7 +44,6 @@ import org.mockito.Mockito;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
@@ -220,7 +219,7 @@ public class TestRatisUnderReplicationHandler {
new RatisUnderReplicationHandler(policy, conf, nodeManager,
replicationManager);
- Map<DatanodeDetails, SCMCommand<?>> commands =
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
handler.processAndCreateCommands(replicas, pendingOps,
healthResult, minHealthyForMaintenance);
Assert.assertEquals(expectNumCommands, commands.size());
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
index a56030811d..48cc5476ab 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.scm.container.replication;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -34,9 +35,9 @@ import org.mockito.Mockito;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
-import java.util.Map;
+import java.util.Set;
import static org.mockito.ArgumentMatchers.any;
@@ -84,10 +85,10 @@ public class TestUnderReplicatedProcessor {
targetNodes.add(MockDatanodeDetails.randomDatanodeDetails());
byte[] missingIndexes = {4, 5};
- Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
- commands.put(MockDatanodeDetails.randomDatanodeDetails(),
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
+ commands.add(Pair.of(MockDatanodeDetails.randomDatanodeDetails(),
new ReconstructECContainersCommand(container.getContainerID(),
- sourceNodes, targetNodes, missingIndexes, repConfig));
+ sourceNodes, targetNodes, missingIndexes, repConfig)));
Mockito.when(replicationManager
.processUnderReplicatedContainer(any()))
@@ -115,8 +116,8 @@ public class TestUnderReplicatedProcessor {
container.getContainerID(), sourceDns);
rcc.setReplicaIndex(3);
- Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
- commands.put(targetDn, rcc);
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
+ commands.add(Pair.of(targetDn, rcc));
Mockito.when(replicationManager
.processUnderReplicatedContainer(any()))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]