This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new e360de0337 HDDS-8727. Defer non-critical partial EC reconstruction
(#4810)
e360de0337 is described below
commit e360de03375e5204857626b6949e7a2f61721d29
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Thu Jun 1 20:48:48 2023 +0200
HDDS-8727. Defer non-critical partial EC reconstruction (#4810)
---
.../replication/ECUnderReplicationHandler.java | 103 ++--
.../pipeline/InsufficientDatanodesException.java | 23 +-
.../hdds/scm/pipeline/SimplePipelineProvider.java | 12 +-
.../replication/TestECUnderReplicationHandler.java | 556 +++++++++++++--------
4 files changed, 444 insertions(+), 250 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index 8b47e11fa8..fa189acba1 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.container.replication;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -64,7 +65,7 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
private final long currentContainerSize;
private final ReplicationManager replicationManager;
- public ECUnderReplicationHandler(final PlacementPolicy containerPlacement,
+ ECUnderReplicationHandler(final PlacementPolicy containerPlacement,
final ConfigurationSource conf, ReplicationManager replicationManager) {
this.containerPlacement = containerPlacement;
this.currentContainerSize = (long) conf
@@ -131,7 +132,6 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
replicationManager);
List<DatanodeDetails> excludedNodes
= excludedAndUsedNodes.getExcludedNodes();
- excludedNodes.addAll(replicationManager.getExcludedNodes());
List<DatanodeDetails> usedNodes
= excludedAndUsedNodes.getUsedNodes();
@@ -164,6 +164,8 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
| CommandTargetOverloadedException e) {
firstException = e;
}
+
+ excludedNodes.addAll(replicationManager.getExcludedNodes());
try {
commandsSent += processDecommissioningIndexes(replicaCount, sources,
availableSourceNodes, excludedNodes, usedNodes);
@@ -270,10 +272,9 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
}
/**
- * Processes replicas that are in maintenance nodes and should need
+ * Processes replicas that are on in-service nodes and should need
* additional copies.
* @return number of commands sent
- * @throws IOException
*/
private int processMissingIndexes(
ECContainerReplicaCount replicaCount, Map<Integer,
@@ -285,26 +286,59 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
ECReplicationConfig repConfig =
(ECReplicationConfig)container.getReplicationConfig();
List<Integer> missingIndexes = replicaCount.unavailableIndexes(true);
- if (missingIndexes.size() == 0) {
+ final int expectedTargetCount = missingIndexes.size();
+ if (expectedTargetCount == 0) {
return 0;
}
int commandsSent = 0;
if (sources.size() >= repConfig.getData()) {
- int expectedTargets = missingIndexes.size();
- final List<DatanodeDetails> selectedDatanodes =
- ReplicationManagerUtil.getTargetDatanodes(containerPlacement,
- expectedTargets, usedNodes, excludedNodes, currentContainerSize,
- container);
+ Set<DatanodeDetails> excludedDueToLoad =
+ replicationManager.getExcludedNodes();
+ final boolean hasOverloaded = !excludedDueToLoad.isEmpty();
+ final List<DatanodeDetails> excludedOrOverloadedNodes = hasOverloaded
+ ? ImmutableList.copyOf(ImmutableSet.<DatanodeDetails>builder()
+ .addAll(excludedNodes)
+ .addAll(excludedDueToLoad)
+ .build())
+ : excludedNodes;
+
+ // placement with overloaded nodes excluded
+ final List<DatanodeDetails> selectedDatanodes = getTargetDatanodes(
+ container, expectedTargetCount, usedNodes, excludedOrOverloadedNodes
+ );
+ final int targetCount = selectedDatanodes.size();
+
+ if (hasOverloaded &&
+ // selection allows partial recovery
+ 0 < targetCount && targetCount < expectedTargetCount &&
+ // recovery is not yet critical
+ expectedTargetCount < repConfig.getParity()) {
+
+ // check if placement exists when overloaded nodes are not excluded
+ final List<DatanodeDetails> targetsMaybeOverloaded =
getTargetDatanodes(
+ container, expectedTargetCount, usedNodes, excludedNodes);
+
+ if (targetsMaybeOverloaded.size() == expectedTargetCount) {
+ final int overloadedCount = expectedTargetCount - targetCount;
+ LOG.info("Deferring reconstruction of container {}, which requires
{}"
+ + " target nodes to be fully reconstructed, but {} selected"
+ + " nodes are currently overloaded.",
+ container.getContainerID(), expectedTargetCount,
overloadedCount);
+
+ throw new InsufficientDatanodesException(expectedTargetCount,
+ targetCount);
+ }
+ }
// If we got less targets than missing indexes, we need to prune the
- // missing index list so it only tries to recover the nummber of indexes
+ // missing index list so it only tries to recover the number of indexes
// we have targets for.
- if (selectedDatanodes.size() < expectedTargets) {
- missingIndexes.subList(selectedDatanodes.size(),
- missingIndexes.size()).clear();
+ if (targetCount < expectedTargetCount) {
+ missingIndexes.subList(targetCount, expectedTargetCount).clear();
}
- if (validatePlacement(availableSourceNodes, selectedDatanodes)) {
+ if (0 < targetCount &&
+ validatePlacement(availableSourceNodes, selectedDatanodes)) {
usedNodes.addAll(selectedDatanodes);
// TODO - what are we adding all the selected nodes to available
// sources?
@@ -337,13 +371,12 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
}
commandsSent++;
}
- if (selectedDatanodes.size() != expectedTargets) {
+ if (targetCount != expectedTargetCount) {
LOG.debug("Insufficient nodes were returned from the placement policy"
+
" to fully reconstruct container {}. Requested {} received {}",
- container.getContainerID(), expectedTargets,
- selectedDatanodes.size());
- throw new InsufficientDatanodesException(expectedTargets,
- selectedDatanodes.size());
+ container.getContainerID(), expectedTargetCount, targetCount);
+ throw new InsufficientDatanodesException(expectedTargetCount,
+ targetCount);
}
} else {
LOG.warn("Cannot proceed for EC container reconstruction for {}, due"
@@ -355,11 +388,21 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
return commandsSent;
}
+ private List<DatanodeDetails> getTargetDatanodes(
+ ContainerInfo container, int requiredNodes,
+ List<DatanodeDetails> usedNodes,
+ List<DatanodeDetails> excludedNodes
+ ) throws SCMException {
+ return ReplicationManagerUtil.getTargetDatanodes(
+ containerPlacement, requiredNodes,
+ usedNodes, excludedNodes,
+ currentContainerSize, container);
+ }
+
/**
- * Processes replicas that are in maintenance nodes and should need
+ * Processes replicas that are in decommissioning nodes and should need
* additional copies.
* @return number of commands sent
- * @throws IOException
*/
private int processDecommissioningIndexes(
ECContainerReplicaCount replicaCount,
@@ -371,10 +414,8 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
Set<Integer> decomIndexes = replicaCount.decommissioningOnlyIndexes(true);
int commandsSent = 0;
if (decomIndexes.size() > 0) {
- final List<DatanodeDetails> selectedDatanodes =
- ReplicationManagerUtil.getTargetDatanodes(containerPlacement,
- decomIndexes.size(), usedNodes, excludedNodes,
- currentContainerSize, container);
+ final List<DatanodeDetails> selectedDatanodes = getTargetDatanodes(
+ container, decomIndexes.size(), usedNodes, excludedNodes);
if (validatePlacement(availableSourceNodes, selectedDatanodes)) {
usedNodes.addAll(selectedDatanodes);
@@ -429,12 +470,10 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
/**
* Processes replicas that are in maintenance nodes and should need
* additional copies.
- * @param replicaCount
* @param sources Map of Replica Index to a pair of ContainerReplica and
* NodeStatus. This is the list of available replicas.
* @param excludedNodes nodes that should not be targets for new copies
- * @@return number of commands sent
- * @throws IOException
+ * @return number of commands sent
*/
private int processMaintenanceOnlyIndexes(
ECContainerReplicaCount replicaCount,
@@ -453,9 +492,9 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
if (additionalMaintenanceCopiesNeeded == 0) {
return 0;
}
- List<DatanodeDetails> targets = ReplicationManagerUtil.getTargetDatanodes(
- containerPlacement, maintIndexes.size(), usedNodes, excludedNodes,
- currentContainerSize, container);
+ List<DatanodeDetails> targets = getTargetDatanodes(
+ container, maintIndexes.size(), usedNodes, excludedNodes
+ );
usedNodes.addAll(targets);
Iterator<DatanodeDetails> iterator = targets.iterator();
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/InsufficientDatanodesException.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/InsufficientDatanodesException.java
index 53f76320eb..c7ab1adcd0 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/InsufficientDatanodesException.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/InsufficientDatanodesException.java
@@ -23,21 +23,30 @@ import java.io.IOException;
/**
* Exception thrown when there are not enough Datanodes to create a pipeline.
*/
-public class InsufficientDatanodesException extends IOException {
+public final class InsufficientDatanodesException extends IOException {
+ private final int required;
+ private final int available;
- public InsufficientDatanodesException() {
- super();
- }
-
- public InsufficientDatanodesException(String message) {
+ public InsufficientDatanodesException(int required, int available,
+ String message) {
super(message);
+ this.required = required;
+ this.available = available;
}
public InsufficientDatanodesException(int required, int available) {
- super("Not enough datanodes" +
+ this(required, available, "Not enough datanodes" +
", requested: " + required +
", found: " + available
);
}
+
+ public int getRequiredNodes() {
+ return required;
+ }
+
+ public int getAvailableNodes() {
+ return available;
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index a61ed2719a..ac390ea366 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -53,11 +53,13 @@ public class SimplePipelineProvider
List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
throws IOException {
List<DatanodeDetails> dns = pickNodesNotUsed(replicationConfig);
- if (dns.size() < replicationConfig.getRequiredNodes()) {
- String e = String
- .format("Cannot create pipeline of factor %d using %d nodes.",
- replicationConfig.getRequiredNodes(), dns.size());
- throw new InsufficientDatanodesException(e);
+ int available = dns.size();
+ int required = replicationConfig.getRequiredNodes();
+ if (available < required) {
+ String msg = String.format(
+ "Cannot create pipeline of factor %d using %d nodes.",
+ required, available);
+ throw new InsufficientDatanodesException(required, available, msg);
}
Collections.shuffle(dns);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index 4ee5406b5a..6fc6b1c0eb 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
+import
org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.UnderReplicatedHealthResult;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.net.NodeSchema;
import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
@@ -46,43 +47,60 @@ import
org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.assertj.core.util.Lists;
-import org.junit.Assert;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.toSet;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.UNHEALTHY;
+import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainer;
+import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerReplica;
import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* Tests the ECUnderReplicationHandling functionality.
@@ -99,9 +117,9 @@ public class TestECUnderReplicationHandler {
private PlacementPolicy ecPlacementPolicy;
private int remainingMaintenanceRedundancy = 1;
private Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent;
- private AtomicBoolean throwOverloadedExceptionOnReplication
+ private final AtomicBoolean throwOverloadedExceptionOnReplication
= new AtomicBoolean(false);
- private AtomicBoolean throwOverloadedExceptionOnReconstruction
+ private final AtomicBoolean throwOverloadedExceptionOnReconstruction
= new AtomicBoolean(false);
@BeforeEach
@@ -114,13 +132,13 @@ public class TestECUnderReplicationHandler {
dd.getPersistedOpState(), HddsProtos.NodeState.HEALTHY, 0);
}
};
- replicationManager = Mockito.mock(ReplicationManager.class);
+ replicationManager = mock(ReplicationManager.class);
ReplicationManager.ReplicationManagerConfiguration rmConf =
new ReplicationManager.ReplicationManagerConfiguration();
- Mockito.when(replicationManager.getConfig())
+ when(replicationManager.getConfig())
.thenReturn(rmConf);
- Mockito.when(replicationManager.getNodeStatus(any(DatanodeDetails.class)))
+ when(replicationManager.getNodeStatus(any(DatanodeDetails.class)))
.thenAnswer(invocation -> {
DatanodeDetails dd = invocation.getArgument(0);
return new NodeStatus(dd.getPersistedOpState(),
@@ -139,67 +157,155 @@ public class TestECUnderReplicationHandler {
conf = SCMTestUtils.getConf();
repConfig = new ECReplicationConfig(DATA, PARITY);
- container = ReplicationTestUtil
- .createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
+ container = createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
policy = ReplicationTestUtil
.getSimpleTestPlacementPolicy(nodeManager, conf);
NodeSchema[] schemas =
new NodeSchema[] {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
NodeSchemaManager.getInstance().init(schemas, true);
- ecPlacementPolicy = Mockito.mock(PlacementPolicy.class);
- Mockito.when(ecPlacementPolicy.validateContainerPlacement(
- anyList(), anyInt()))
+ ecPlacementPolicy = mock(PlacementPolicy.class);
+ when(ecPlacementPolicy.validateContainerPlacement(anyList(), anyInt()))
.thenReturn(new ContainerPlacementStatusDefault(2, 2, 3));
}
+ @ParameterizedTest
+ @ValueSource(strings = {"rs-6-3-1024k", "rs-10-4-1024k"})
+ void defersNonCriticalPartialReconstruction(String rep) throws IOException {
+ final ECReplicationConfig ec = new ECReplicationConfig(rep);
+ final int data = ec.getData();
+ final int parity = ec.getParity();
+ ECUnderReplicationHandler subject = new ECUnderReplicationHandler(
+ ecPlacementPolicy, conf, replicationManager);
+ UnderReplicatedHealthResult result = mockUnderReplicated(ec);
+
+ Set<DatanodeDetails> excluded = excludeInReplicationManager(1);
+
+ final int remainingRedundancy = 1;
+ PlacementPolicySpy spy = new PlacementPolicySpy(ecPlacementPolicy,
+ data + parity + excluded.size() - 1);
+ Set<ContainerReplica> replicas = createReplicas(data +
remainingRedundancy);
+
+ // WHEN
+ InsufficientDatanodesException e = assertThrows(
+ InsufficientDatanodesException.class,
+ () -> subject.processAndSendCommands(replicas, emptyList(), result,
2));
+
+ // THEN
+ assertEquals(2, spy.callCount());
+ assertExcluded(excluded, spy.excludedNodes(0));
+ assertUsedNodes(replicas, spy.usedNodes(0));
+ assertExcluded(emptySet(), spy.excludedNodes(1));
+ assertUsedNodes(replicas, spy.usedNodes(1));
+ assertEquals(parity - remainingRedundancy, e.getRequiredNodes());
+ assertEquals(e.getRequiredNodes() - excluded.size(),
e.getAvailableNodes());
+ verify(replicationManager, never())
+ .sendThrottledReconstructionCommand(any(), any());
+ }
+
+ private static UnderReplicatedHealthResult mockUnderReplicated(
+ ECReplicationConfig ec) {
+ UnderReplicatedHealthResult result =
+ mock(UnderReplicatedHealthResult.class);
+ when(result.getContainerInfo())
+ .thenReturn(createContainer(HddsProtos.LifeCycleState.CLOSED, ec));
+ return result;
+ }
+
+ private static void assertExcluded(Set<DatanodeDetails> excluded,
+ List<DatanodeDetails> excludedNodes) {
+ assertEquals(excluded, new TreeSet<>(excludedNodes));
+ }
+
+ private Set<DatanodeDetails> excludeInReplicationManager(int count) {
+ Set<DatanodeDetails> excluded = IntStream.range(0, count)
+ .mapToObj(i -> MockDatanodeDetails.randomDatanodeDetails())
+ .collect(toSet());
+ when(replicationManager.getExcludedNodes())
+ .thenReturn(excluded);
+ return excluded;
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"rs-3-2-1024k", "rs-6-3-1024k", "rs-10-4-1024k"})
+ void performsCriticalPartialReconstruction(String rep) throws IOException {
+ final ECReplicationConfig ec = new ECReplicationConfig(rep);
+ final int data = ec.getData();
+ final int parity = ec.getParity();
+ ECUnderReplicationHandler subject = new ECUnderReplicationHandler(
+ ecPlacementPolicy, conf, replicationManager);
+ UnderReplicatedHealthResult result = mockUnderReplicated(ec);
+
+ DatanodeDetails excludedByRM = MockDatanodeDetails.randomDatanodeDetails();
+ Set<DatanodeDetails> excluded = singleton(excludedByRM);
+ when(replicationManager.getExcludedNodes())
+ .thenReturn(excluded);
+
+ final int remainingRedundancy = 0;
+ PlacementPolicySpy spy = new PlacementPolicySpy(ecPlacementPolicy,
+ data + parity + excluded.size() - 1);
+ Set<ContainerReplica> replicas = createReplicas(data +
remainingRedundancy);
+
+ // WHEN
+ InsufficientDatanodesException e = assertThrows(
+ InsufficientDatanodesException.class,
+ () -> subject.processAndSendCommands(replicas, emptyList(), result,
2));
+
+ // THEN
+ assertEquals(1, spy.callCount());
+ assertEquals(singletonList(excludedByRM), spy.excludedNodes(0));
+ assertUsedNodes(replicas, spy.usedNodes(0));
+ assertEquals(parity - remainingRedundancy, e.getRequiredNodes());
+ assertEquals(e.getRequiredNodes() - excluded.size(),
e.getAvailableNodes());
+ verify(replicationManager, times(1))
+ .sendThrottledReconstructionCommand(any(), any());
+ }
+
@Test
void excludesOverloadedNodes() throws IOException {
ECUnderReplicationHandler subject = new ECUnderReplicationHandler(
ecPlacementPolicy, conf, replicationManager);
- Set<ContainerReplica> replicas = ReplicationTestUtil
- .createReplicas(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
- Pair.of(IN_SERVICE, 3));
- ContainerHealthResult.UnderReplicatedHealthResult result =
- Mockito.mock(ContainerHealthResult.UnderReplicatedHealthResult.class);
- Mockito.when(result.getContainerInfo()).thenReturn(container);
+ UnderReplicatedHealthResult result =
+ mock(UnderReplicatedHealthResult.class);
+ when(result.getContainerInfo()).thenReturn(container);
DatanodeDetails excludedByRM = MockDatanodeDetails.randomDatanodeDetails();
- Mockito.when(replicationManager.getExcludedNodes())
+ when(replicationManager.getExcludedNodes())
.thenReturn(singleton(excludedByRM));
- ArgumentCaptor<List<DatanodeDetails>> captor =
- ArgumentCaptor.forClass(List.class);
- // The used list is modified after it is passed to the placement policy,
- // so a plain Captor won't work.
- AtomicReference<List<DatanodeDetails>> usedList = new AtomicReference<>();
- Mockito.when(ecPlacementPolicy.chooseDatanodes(any(), captor.capture(),
- any(), anyInt(), anyLong(), anyLong())
- ).thenAnswer(invocationOnMock -> {
- usedList.set(new ArrayList<>(invocationOnMock.getArgument(0)));
- int numNodes = invocationOnMock.getArgument(3);
- List<DatanodeDetails> targets = new ArrayList<>();
- for (int i = 0; i < numNodes; i++) {
- targets.add(MockDatanodeDetails.randomDatanodeDetails());
- }
- return targets;
- });
+ PlacementPolicySpy spy = new PlacementPolicySpy(ecPlacementPolicy,
+ repConfig.getRequiredNodes() + 1);
+ // WHEN
+ Set<ContainerReplica> replicas = createReplicas(3);
subject.processAndSendCommands(
- replicas, Collections.emptyList(), result, 2);
+ replicas, emptyList(), result, 2);
+
+ // THEN
+ assertEquals(1, spy.callCount());
+ assertEquals(singletonList(excludedByRM), spy.excludedNodes(0));
+ assertUsedNodes(replicas, spy.usedNodes(0));
+ verify(replicationManager, times(1))
+ .sendThrottledReconstructionCommand(any(), any());
+ }
- Assertions.assertTrue(captor.getValue().contains(excludedByRM));
- Assertions.assertEquals(3, usedList.get().size());
+ private static void assertUsedNodes(Set<ContainerReplica> replicas,
+ List<DatanodeDetails> usedNodes) {
+ assertEquals(replicas.size(), usedNodes.size());
for (ContainerReplica r : replicas) {
- Assertions.assertTrue(
- usedList.get().contains(r.getDatanodeDetails()));
+ assertTrue(usedNodes.contains(r.getDatanodeDetails()));
}
}
+ private static Set<ContainerReplica> createReplicas(int count) {
+ ContainerID id = ContainerID.valueOf(1);
+ return IntStream.rangeClosed(1, count)
+ .mapToObj(i -> createContainerReplica(id, i, IN_SERVICE, CLOSED))
+ .collect(toSet());
+ }
+
@Test
public void testUnderReplicationWithMissingParityIndex5() throws IOException
{
- Set<ContainerReplica> availableReplicas = ReplicationTestUtil
- .createReplicas(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
- Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4));
+ Set<ContainerReplica> availableReplicas = createReplicas(4);
testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
availableReplicas, 0, 0, policy);
}
@@ -215,8 +321,7 @@ public class TestECUnderReplicationHandler {
@Test
public void testUnderReplicationWithMissingIndex2345() throws IOException {
- Set<ContainerReplica> availableReplicas =
- ReplicationTestUtil.createReplicas(Pair.of(IN_SERVICE, 1));
+ Set<ContainerReplica> availableReplicas = createReplicas(1);
testUnderReplicationWithMissingIndexes(ImmutableList.of(2, 3, 4, 5),
availableReplicas, 0, 0, policy);
}
@@ -230,15 +335,13 @@ public class TestECUnderReplicationHandler {
@Test
public void testThrowsWhenTargetsOverloaded() throws IOException {
- Set<ContainerReplica> availableReplicas = ReplicationTestUtil
- .createReplicas(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
- Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4));
+ Set<ContainerReplica> availableReplicas = createReplicas(4);
doThrow(new CommandTargetOverloadedException("Overloaded"))
.when(replicationManager).sendThrottledReconstructionCommand(
any(), any());
- Assertions.assertThrows(CommandTargetOverloadedException.class, () ->
+ assertThrows(CommandTargetOverloadedException.class, () ->
testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
availableReplicas, 0, 0, policy));
}
@@ -252,11 +355,11 @@ public class TestECUnderReplicationHandler {
Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds =
testUnderReplicationWithMissingIndexes(
Lists.emptyList(), availableReplicas, 1, 0, policy);
- Assertions.assertEquals(1, cmds.size());
+ assertEquals(1, cmds.size());
// Check the replicate command has index 1 set
ReplicateContainerCommand cmd = (ReplicateContainerCommand) cmds
.iterator().next().getValue();
- Assertions.assertEquals(1, cmd.getReplicaIndex());
+ assertEquals(1, cmd.getReplicaIndex());
}
@@ -267,7 +370,7 @@ public class TestECUnderReplicationHandler {
throws IOException {
Set<ContainerReplica> availableReplicas = new LinkedHashSet<>();
ContainerReplica deadMaintenance =
- ReplicationTestUtil.createContainerReplica(container.containerID(),
+ createContainerReplica(container.containerID(),
1, IN_MAINTENANCE, CLOSED);
availableReplicas.add(deadMaintenance);
@@ -295,11 +398,11 @@ public class TestECUnderReplicationHandler {
Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds =
testUnderReplicationWithMissingIndexes(
Lists.emptyList(), availableReplicas, 1, 0, policy);
- Assertions.assertEquals(1, cmds.size());
+ assertEquals(1, cmds.size());
// Check the replicate command has index 1 set
for (Pair<DatanodeDetails, SCMCommand<?>> c : cmds) {
// Ensure neither of the commands are for the dead maintenance node
- Assertions.assertNotEquals(deadMaintenance.getDatanodeDetails(),
+ assertNotEquals(deadMaintenance.getDatanodeDetails(),
c.getKey());
}
}
@@ -315,7 +418,7 @@ public class TestECUnderReplicationHandler {
.when(replicationManager).sendThrottledReplicationCommand(
any(), anyList(), any(), anyInt());
- Assertions.assertThrows(CommandTargetOverloadedException.class, () ->
+ assertThrows(CommandTargetOverloadedException.class, () ->
testUnderReplicationWithMissingIndexes(
Lists.emptyList(), availableReplicas, 1, 0, policy));
}
@@ -377,28 +480,27 @@ public class TestECUnderReplicationHandler {
public void testUnderReplicationWithInvalidPlacement()
throws IOException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
- .createReplicas(Pair.of(DECOMMISSIONING, 1),
- Pair.of(DECOMMISSIONING, 2), Pair.of(IN_SERVICE, 3),
- Pair.of(IN_SERVICE, 4));
- PlacementPolicy mockedPolicy = Mockito.spy(policy);
+ .createReplicas(Pair.of(DECOMMISSIONING, 1),
+ Pair.of(DECOMMISSIONING, 2), Pair.of(IN_SERVICE, 3),
+ Pair.of(IN_SERVICE, 4));
+ PlacementPolicy mockedPolicy = spy(policy);
ContainerPlacementStatus mockedContainerPlacementStatus =
- Mockito.mock(ContainerPlacementStatus.class);
- Mockito.when(mockedContainerPlacementStatus.isPolicySatisfied())
- .thenReturn(false);
- Mockito.when(mockedPolicy.validateContainerPlacement(anyList(),
- anyInt())).thenReturn(mockedContainerPlacementStatus);
- Mockito.when(mockedPolicy.validateContainerPlacement(anyList(),
- anyInt())).thenAnswer(invocationOnMock -> {
- Set<DatanodeDetails> dns =
- new HashSet<>(invocationOnMock.getArgument(0));
- Assert.assertTrue(
- availableReplicas.stream()
- .map(ContainerReplica::getDatanodeDetails)
- .filter(dn -> dn.getPersistedOpState() == IN_SERVICE)
- .allMatch(dns::contains));
- return mockedContainerPlacementStatus;
- });
- testUnderReplicationWithMissingIndexes(Collections.emptyList(),
+ mock(ContainerPlacementStatus.class);
+ when(mockedContainerPlacementStatus.isPolicySatisfied())
+ .thenReturn(false);
+ when(mockedPolicy.validateContainerPlacement(anyList(), anyInt()))
+ .thenReturn(mockedContainerPlacementStatus);
+ when(mockedPolicy.validateContainerPlacement(anyList(), anyInt()))
+ .thenAnswer(invocationOnMock -> {
+ Set<DatanodeDetails> dns =
+ new HashSet<>(invocationOnMock.getArgument(0));
+ assertTrue(availableReplicas.stream()
+ .map(ContainerReplica::getDatanodeDetails)
+ .filter(dn -> dn.getPersistedOpState() == IN_SERVICE)
+ .allMatch(dns::contains));
+ return mockedContainerPlacementStatus;
+ });
+ testUnderReplicationWithMissingIndexes(emptyList(),
availableReplicas, 0, 0, mockedPolicy);
}
@@ -422,13 +524,13 @@ public class TestECUnderReplicationHandler {
.getNoNodesTestPlacementPolicy(nodeManager, conf);
ContainerReplica overRepReplica =
- ReplicationTestUtil.createContainerReplica(container.containerID(),
+ createContainerReplica(container.containerID(),
4, IN_SERVICE, CLOSED);
ContainerReplica decomReplica =
- ReplicationTestUtil.createContainerReplica(container.containerID(),
+ createContainerReplica(container.containerID(),
5, DECOMMISSIONING, CLOSED);
ContainerReplica maintReplica =
- ReplicationTestUtil.createContainerReplica(container.containerID(),
+ createContainerReplica(container.containerID(),
5, ENTERING_MAINTENANCE, CLOSED);
List<ContainerReplica> replicasToAdd = new ArrayList<>();
@@ -439,8 +541,8 @@ public class TestECUnderReplicationHandler {
ECUnderReplicationHandler ecURH =
new ECUnderReplicationHandler(
noNodesPolicy, conf, replicationManager);
- ContainerHealthResult.UnderReplicatedHealthResult underRep =
- new ContainerHealthResult.UnderReplicatedHealthResult(container,
+ UnderReplicatedHealthResult underRep =
+ new UnderReplicatedHealthResult(container,
1, false, false, false);
// The underRepHandler processes in stages. First missing indexes, then
@@ -453,18 +555,15 @@ public class TestECUnderReplicationHandler {
// throw an exception, and then make it also over replicated, returning the
// commands to fix the over replication instead.
for (ContainerReplica toAdd : replicasToAdd) {
- Mockito.clearInvocations(replicationManager);
- Set<ContainerReplica> availableReplicas = ReplicationTestUtil
- .createReplicas(Pair.of(IN_SERVICE, 1),
- Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
- Pair.of(IN_SERVICE, 4));
+ clearInvocations(replicationManager);
+ Set<ContainerReplica> availableReplicas = createReplicas(4);
if (toAdd != null) {
availableReplicas.add(toAdd);
}
- Assert.assertThrows(SCMException.class,
+ assertThrows(SCMException.class,
() -> ecURH.processAndSendCommands(availableReplicas,
- Collections.emptyList(), underRep, 2));
+ emptyList(), underRep, 2));
// Now adjust replicas so it is also over replicated. This time before
// throwing it should call the OverRepHandler and return whatever it
@@ -473,8 +572,8 @@ public class TestECUnderReplicationHandler {
assertThrows(SCMException.class,
() -> ecURH.processAndSendCommands(availableReplicas,
- Collections.emptyList(), underRep, 2));
- Mockito.verify(replicationManager, times(1))
+ emptyList(), underRep, 2));
+ verify(replicationManager, times(1))
.processOverReplicatedContainer(underRep);
}
}
@@ -492,10 +591,10 @@ public class TestECUnderReplicationHandler {
.getNoNodesTestPlacementPolicy(nodeManager, conf);
ContainerReplica decomReplica =
- ReplicationTestUtil.createContainerReplica(container.containerID(),
+ createContainerReplica(container.containerID(),
5, DECOMMISSIONING, CLOSED);
ContainerReplica maintReplica =
- ReplicationTestUtil.createContainerReplica(container.containerID(),
+ createContainerReplica(container.containerID(),
5, ENTERING_MAINTENANCE, CLOSED);
List<ContainerReplica> replicasToAdd = new ArrayList<>();
@@ -506,11 +605,11 @@ public class TestECUnderReplicationHandler {
ECUnderReplicationHandler ecURH =
new ECUnderReplicationHandler(
noNodesPolicy, conf, replicationManager);
- ContainerHealthResult.UnderReplicatedHealthResult underRep =
- new ContainerHealthResult.UnderReplicatedHealthResult(container,
+ UnderReplicatedHealthResult underRep =
+ new UnderReplicatedHealthResult(container,
1, false, false, false);
ContainerReplica unhealthyReplica =
- ReplicationTestUtil.createContainerReplica(container.containerID(),
+ createContainerReplica(container.containerID(),
4, IN_SERVICE, UNHEALTHY);
/*
@@ -526,7 +625,7 @@ public class TestECUnderReplicationHandler {
it should return the command to delete one.
*/
for (ContainerReplica toAdd : replicasToAdd) {
- Mockito.clearInvocations(replicationManager);
+ clearInvocations(replicationManager);
Set<ContainerReplica> existingReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 5),
Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
@@ -536,10 +635,10 @@ public class TestECUnderReplicationHandler {
}
// should throw an SCMException indicating no targets were found
- Assert.assertThrows(SCMException.class,
+ assertThrows(SCMException.class,
() -> ecURH.processAndSendCommands(existingReplicas,
- Collections.emptyList(), underRep, 2));
- Mockito.verify(replicationManager, times(0))
+ emptyList(), underRep, 2));
+ verify(replicationManager, times(0))
.sendThrottledDeleteCommand(eq(container), anyInt(),
any(DatanodeDetails.class), anyBoolean());
@@ -550,7 +649,7 @@ public class TestECUnderReplicationHandler {
*/
existingReplicas.add(unhealthyReplica);
existingReplicas.add(
- ReplicationTestUtil.createContainerReplica(container.containerID(),
+ createContainerReplica(container.containerID(),
1, IN_SERVICE, UNHEALTHY));
/*
@@ -559,61 +658,59 @@ public class TestECUnderReplicationHandler {
assertions.
*/
commandsSent.clear();
- Mockito.doAnswer(invocation -> {
+ doAnswer(invocation -> {
commandsSent.add(Pair.of(invocation.getArgument(2),
createDeleteContainerCommand(invocation.getArgument(0),
invocation.getArgument(1))));
return null;
})
.when(replicationManager)
- .sendThrottledDeleteCommand(Mockito.any(ContainerInfo.class),
- Mockito.anyInt(), Mockito.any(DatanodeDetails.class),
- Mockito.eq(true));
+ .sendThrottledDeleteCommand(any(ContainerInfo.class),
+ anyInt(), any(DatanodeDetails.class),
+ eq(true));
assertThrows(SCMException.class,
() -> ecURH.processAndSendCommands(existingReplicas,
- Collections.emptyList(), underRep, 2));
- Mockito.verify(replicationManager, times(1))
+ emptyList(), underRep, 2));
+ verify(replicationManager, times(1))
.sendThrottledDeleteCommand(container,
unhealthyReplica.getReplicaIndex(),
unhealthyReplica.getDatanodeDetails(), true);
- Assertions.assertEquals(1, commandsSent.size());
+ assertEquals(1, commandsSent.size());
Pair<DatanodeDetails, SCMCommand<?>> command =
commandsSent.iterator().next();
- Assertions.assertEquals(SCMCommandProto.Type.deleteContainerCommand,
+ assertEquals(SCMCommandProto.Type.deleteContainerCommand,
command.getValue().getType());
DeleteContainerCommand deleteCommand =
(DeleteContainerCommand) command.getValue();
- Assertions.assertEquals(unhealthyReplica.getDatanodeDetails(),
+ assertEquals(unhealthyReplica.getDatanodeDetails(),
command.getKey());
- Assertions.assertEquals(container.containerID(),
+ assertEquals(container.containerID(),
ContainerID.valueOf(deleteCommand.getContainerID()));
- Assertions.assertEquals(unhealthyReplica.getReplicaIndex(),
+ assertEquals(unhealthyReplica.getReplicaIndex(),
deleteCommand.getReplicaIndex());
}
}
@Test
public void testPartialReconstructionIfNotEnoughNodes() {
- Set<ContainerReplica> availableReplicas = ReplicationTestUtil
- .createReplicas(Pair.of(IN_SERVICE, 1),
- Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3));
+ Set<ContainerReplica> availableReplicas = createReplicas(3);
PlacementPolicy placementPolicy = ReplicationTestUtil
.getInsufficientNodesTestPlacementPolicy(nodeManager, conf, 2);
ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
placementPolicy, conf, replicationManager);
- ContainerHealthResult.UnderReplicatedHealthResult underRep =
- new ContainerHealthResult.UnderReplicatedHealthResult(container,
+ UnderReplicatedHealthResult underRep =
+ new UnderReplicatedHealthResult(container,
0, false, false, false);
assertThrows(InsufficientDatanodesException.class, () ->
- ecURH.processAndSendCommands(availableReplicas,
Collections.emptyList(),
+ ecURH.processAndSendCommands(availableReplicas, emptyList(),
underRep, 1));
- Assertions.assertEquals(1, commandsSent.size());
+ assertEquals(1, commandsSent.size());
ReconstructECContainersCommand cmd = (ReconstructECContainersCommand)
commandsSent.iterator().next().getValue();
- Assertions.assertEquals(1, cmd.getTargetDatanodes().size());
+ assertEquals(1, cmd.getTargetDatanodes().size());
}
@Test
@@ -624,19 +721,19 @@ public class TestECUnderReplicationHandler {
ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
policy, conf, replicationManager);
- ContainerHealthResult.UnderReplicatedHealthResult underRep =
- new ContainerHealthResult.UnderReplicatedHealthResult(container,
+ UnderReplicatedHealthResult underRep =
+ new UnderReplicatedHealthResult(container,
0, false, false, false);
// Setup so reconstruction fails, but we should still get a replicate
// command for the decommissioning node and an exception thrown.
throwOverloadedExceptionOnReconstruction.set(true);
assertThrows(CommandTargetOverloadedException.class, () ->
- ecURH.processAndSendCommands(availableReplicas,
Collections.emptyList(),
+ ecURH.processAndSendCommands(availableReplicas, emptyList(),
underRep, 1));
- Assertions.assertEquals(1, commandsSent.size());
+ assertEquals(1, commandsSent.size());
SCMCommand<?> cmd = commandsSent.iterator().next().getValue();
- Assertions.assertEquals(
+ assertEquals(
SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
}
@@ -651,16 +748,16 @@ public class TestECUnderReplicationHandler {
ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
placementPolicy, conf, replicationManager);
- ContainerHealthResult.UnderReplicatedHealthResult underRep =
- new ContainerHealthResult.UnderReplicatedHealthResult(container,
+ UnderReplicatedHealthResult underRep =
+ new UnderReplicatedHealthResult(container,
0, true, false, false);
assertThrows(InsufficientDatanodesException.class, () ->
- ecURH.processAndSendCommands(availableReplicas,
Collections.emptyList(),
+ ecURH.processAndSendCommands(availableReplicas, emptyList(),
underRep, 1));
- Assertions.assertEquals(1, commandsSent.size());
+ assertEquals(1, commandsSent.size());
SCMCommand<?> cmd = commandsSent.iterator().next().getValue();
- Assertions.assertEquals(
+ assertEquals(
SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
}
@@ -673,17 +770,17 @@ public class TestECUnderReplicationHandler {
ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
policy, conf, replicationManager);
- ContainerHealthResult.UnderReplicatedHealthResult underRep =
- new ContainerHealthResult.UnderReplicatedHealthResult(container,
+ UnderReplicatedHealthResult underRep =
+ new UnderReplicatedHealthResult(container,
0, true, false, false);
throwOverloadedExceptionOnReplication.set(true);
assertThrows(CommandTargetOverloadedException.class, () ->
- ecURH.processAndSendCommands(availableReplicas,
Collections.emptyList(),
+ ecURH.processAndSendCommands(availableReplicas, emptyList(),
underRep, 1));
- Assertions.assertEquals(1, commandsSent.size());
+ assertEquals(1, commandsSent.size());
SCMCommand<?> cmd = commandsSent.iterator().next().getValue();
- Assertions.assertEquals(
+ assertEquals(
SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
}
@@ -699,16 +796,16 @@ public class TestECUnderReplicationHandler {
ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
placementPolicy, conf, replicationManager);
- ContainerHealthResult.UnderReplicatedHealthResult underRep =
- new ContainerHealthResult.UnderReplicatedHealthResult(container,
+ UnderReplicatedHealthResult underRep =
+ new UnderReplicatedHealthResult(container,
0, false, false, false);
assertThrows(InsufficientDatanodesException.class, () ->
- ecURH.processAndSendCommands(availableReplicas,
Collections.emptyList(),
+ ecURH.processAndSendCommands(availableReplicas, emptyList(),
underRep, 2));
- Assertions.assertEquals(1, commandsSent.size());
+ assertEquals(1, commandsSent.size());
SCMCommand<?> cmd = commandsSent.iterator().next().getValue();
- Assertions.assertEquals(
+ assertEquals(
SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
}
@@ -722,17 +819,17 @@ public class TestECUnderReplicationHandler {
ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
policy, conf, replicationManager);
- ContainerHealthResult.UnderReplicatedHealthResult underRep =
- new ContainerHealthResult.UnderReplicatedHealthResult(container,
+ UnderReplicatedHealthResult underRep =
+ new UnderReplicatedHealthResult(container,
0, false, false, false);
throwOverloadedExceptionOnReplication.set(true);
assertThrows(CommandTargetOverloadedException.class, () ->
- ecURH.processAndSendCommands(availableReplicas,
Collections.emptyList(),
+ ecURH.processAndSendCommands(availableReplicas, emptyList(),
underRep, 2));
- Assertions.assertEquals(1, commandsSent.size());
+ assertEquals(1, commandsSent.size());
SCMCommand<?> cmd = commandsSent.iterator().next().getValue();
- Assertions.assertEquals(
+ assertEquals(
SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
}
@@ -744,10 +841,10 @@ public class TestECUnderReplicationHandler {
.getSameNodeTestPlacementPolicy(nodeManager, conf, newNode);
ContainerReplica decomReplica =
- ReplicationTestUtil.createContainerReplica(container.containerID(),
+ createContainerReplica(container.containerID(),
5, DECOMMISSIONING, CLOSED);
ContainerReplica maintReplica =
- ReplicationTestUtil.createContainerReplica(container.containerID(),
+ createContainerReplica(container.containerID(),
5, ENTERING_MAINTENANCE, CLOSED);
List<ContainerReplica> replicasToAdd = new ArrayList<>();
@@ -757,8 +854,8 @@ public class TestECUnderReplicationHandler {
ECUnderReplicationHandler ecURH =
new ECUnderReplicationHandler(
sameNodePolicy, conf, replicationManager);
- ContainerHealthResult.UnderReplicatedHealthResult underRep =
- new ContainerHealthResult.UnderReplicatedHealthResult(container,
+ UnderReplicatedHealthResult underRep =
+ new UnderReplicatedHealthResult(container,
1, false, false, false);
// The underRepHandler processes in stages. First missing indexes, then
@@ -780,18 +877,18 @@ public class TestECUnderReplicationHandler {
assertThrows(SCMException.class,
() -> ecURH.processAndSendCommands(availableReplicas,
- Collections.emptyList(), underRep, 2));
+ emptyList(), underRep, 2));
- Mockito.verify(replicationManager, times(1))
+ verify(replicationManager, times(1))
.processOverReplicatedContainer(underRep);
- Assertions.assertEquals(1, commandsSent.size());
+ assertEquals(1, commandsSent.size());
Pair<DatanodeDetails, SCMCommand<?>> pair =
commandsSent.iterator().next();
- Assertions.assertEquals(newNode, pair.getKey());
- Assertions.assertEquals(
+ assertEquals(newNode, pair.getKey());
+ assertEquals(
SCMCommandProto.Type.reconstructECContainersCommand,
pair.getValue().getType());
- Mockito.clearInvocations(replicationManager);
+ clearInvocations(replicationManager);
commandsSent.clear();
}
}
@@ -813,18 +910,18 @@ public class TestECUnderReplicationHandler {
new ECUnderReplicationHandler(
sameNodePolicy, conf, replicationManager);
- ContainerHealthResult.UnderReplicatedHealthResult underRep =
- new ContainerHealthResult.UnderReplicatedHealthResult(container,
+ UnderReplicatedHealthResult underRep =
+ new UnderReplicatedHealthResult(container,
1, true, false, false);
- Assert.assertThrows(SCMException.class,
+ assertThrows(SCMException.class,
() -> ecURH.processAndSendCommands(availableReplicas,
- Collections.emptyList(), underRep, 1));
+ emptyList(), underRep, 1));
// Now adjust replicas so it is also over replicated. This time it should
// call the OverRepHandler and then throw
ContainerReplica overRepReplica =
- ReplicationTestUtil.createContainerReplica(container.containerID(),
+ createContainerReplica(container.containerID(),
4, IN_SERVICE, CLOSED);
availableReplicas.add(overRepReplica);
@@ -833,16 +930,16 @@ public class TestECUnderReplicationHandler {
createDeleteContainerCommand(container,
overRepReplica.getReplicaIndex())));
- Mockito.when(replicationManager.processOverReplicatedContainer(
+ when(replicationManager.processOverReplicatedContainer(
underRep)).thenAnswer(invocationOnMock -> {
commandsSent.addAll(expectedDelete);
return expectedDelete.size();
});
assertThrows(SCMException.class, () -> ecURH.processAndSendCommands(
- availableReplicas, Collections.emptyList(), underRep, 1));
- Mockito.verify(replicationManager, times(1))
+ availableReplicas, emptyList(), underRep, 1));
+ verify(replicationManager, times(1))
.processOverReplicatedContainer(underRep);
- Assertions.assertEquals(true, expectedDelete.equals(commandsSent));
+ assertEquals(expectedDelete, commandsSent);
}
@Test
@@ -885,14 +982,14 @@ public class TestECUnderReplicationHandler {
Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds =
testUnderReplicationWithMissingIndexes(ImmutableList.of(2, 3),
availableReplicas, 0, 0, policy);
- Assertions.assertEquals(1, cmds.size());
+ assertEquals(1, cmds.size());
ReconstructECContainersCommand cmd =
(ReconstructECContainersCommand) cmds.iterator().next().getValue();
// Ensure that all source nodes are IN_SERVICE, we should not have picked
// the non in-service nodes for index 1.
for (ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex s
: cmd.getSources()) {
- Assertions.assertEquals(
+ assertEquals(
IN_SERVICE, s.getDnDetails().getPersistedOpState());
}
}
@@ -909,8 +1006,8 @@ public class TestECUnderReplicationHandler {
Pair.of(IN_MAINTENANCE, 3), Pair.of(IN_SERVICE, 4),
Pair.of(IN_SERVICE, 5));
- Mockito.when(ecPlacementPolicy.chooseDatanodes(anyList(), anyList(),
- Mockito.isNull(), anyInt(), anyLong(), anyLong()))
+ when(ecPlacementPolicy.chooseDatanodes(anyList(), anyList(),
+ isNull(), anyInt(), anyLong(), anyLong()))
.thenAnswer(invocationOnMock -> {
int numNodes = invocationOnMock.getArgument(3);
List<DatanodeDetails> targets = new ArrayList<>();
@@ -920,17 +1017,17 @@ public class TestECUnderReplicationHandler {
return targets;
});
- ContainerHealthResult.UnderReplicatedHealthResult result =
- Mockito.mock(ContainerHealthResult.UnderReplicatedHealthResult.class);
- Mockito.when(result.getContainerInfo()).thenReturn(container);
+ UnderReplicatedHealthResult result =
+ mock(UnderReplicatedHealthResult.class);
+ when(result.getContainerInfo()).thenReturn(container);
ECUnderReplicationHandler handler = new ECUnderReplicationHandler(
ecPlacementPolicy, conf, replicationManager);
handler.processAndSendCommands(availableReplicas,
- Collections.emptyList(), result, 1);
- Assertions.assertEquals(1, commandsSent.size());
- Mockito.verify(ecPlacementPolicy, times(0))
- .chooseDatanodes(anyList(), Mockito.isNull(), eq(0), anyLong(),
+ emptyList(), result, 1);
+ assertEquals(1, commandsSent.size());
+ verify(ecPlacementPolicy, times(0))
+ .chooseDatanodes(anyList(), isNull(), eq(0), anyLong(),
anyLong());
}
@@ -942,9 +1039,7 @@ public class TestECUnderReplicationHandler {
@Test
public void testDatanodesPendingAddAreNotSelectedAsTargets()
throws IOException {
- Set<ContainerReplica> availableReplicas = ReplicationTestUtil
- .createReplicas(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
- Pair.of(IN_SERVICE, 3));
+ Set<ContainerReplica> availableReplicas = createReplicas(3);
DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
List<ContainerReplicaOp> pendingOps = ImmutableList.of(
ContainerReplicaOp.create(ContainerReplicaOp.PendingOpType.ADD, dn,
4));
@@ -955,8 +1050,8 @@ public class TestECUnderReplicationHandler {
containing that DN. Ensures the test will fail if excludeNodes does not
contain the DN pending ADD.
*/
- Mockito.when(ecPlacementPolicy.chooseDatanodes(anyList(), anyList(),
- Mockito.isNull(), anyInt(), anyLong(), anyLong()))
+ when(ecPlacementPolicy.chooseDatanodes(anyList(), anyList(),
+ isNull(), anyInt(), anyLong(), anyLong()))
.thenAnswer(invocationOnMock -> {
List<DatanodeDetails> usedList = invocationOnMock.getArgument(0);
List<DatanodeDetails> excludeList = invocationOnMock.getArgument(1);
@@ -969,15 +1064,15 @@ public class TestECUnderReplicationHandler {
return targets;
});
- ContainerHealthResult.UnderReplicatedHealthResult result =
- Mockito.mock(ContainerHealthResult.UnderReplicatedHealthResult.class);
- Mockito.when(result.getContainerInfo()).thenReturn(container);
+ UnderReplicatedHealthResult result =
+ mock(UnderReplicatedHealthResult.class);
+ when(result.getContainerInfo()).thenReturn(container);
ECUnderReplicationHandler handler = new ECUnderReplicationHandler(
ecPlacementPolicy, conf, replicationManager);
handler.processAndSendCommands(availableReplicas, pendingOps, result, 1);
- Assertions.assertEquals(1, commandsSent.size());
- Assertions.assertNotEquals(dn, commandsSent.iterator().next().getKey());
+ assertEquals(1, commandsSent.size());
+ assertNotEquals(dn, commandsSent.iterator().next().getKey());
}
@Test
@@ -985,17 +1080,17 @@ public class TestECUnderReplicationHandler {
throws IOException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 1));
- ContainerReplica decomReplica = ReplicationTestUtil.createContainerReplica(
+ ContainerReplica decomReplica = createContainerReplica(
container.containerID(), 2, DECOMMISSIONING, CLOSED);
availableReplicas.add(decomReplica);
Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds =
- testUnderReplicationWithMissingIndexes(Collections.emptyList(),
+ testUnderReplicationWithMissingIndexes(emptyList(),
availableReplicas, 1, 0, policy);
- Assertions.assertEquals(1, cmds.size());
+ assertEquals(1, cmds.size());
// With push replication the command should always be sent to the
// decommissioning source.
DatanodeDetails target = cmds.iterator().next().getKey();
- Assertions.assertEquals(decomReplica.getDatanodeDetails(), target);
+ assertEquals(decomReplica.getDatanodeDetails(), target);
}
@Test
@@ -1003,25 +1098,25 @@ public class TestECUnderReplicationHandler {
throws IOException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 1));
- ContainerReplica maintReplica = ReplicationTestUtil.createContainerReplica(
+ ContainerReplica maintReplica = createContainerReplica(
container.containerID(), 2, ENTERING_MAINTENANCE, CLOSED);
availableReplicas.add(maintReplica);
Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds =
- testUnderReplicationWithMissingIndexes(Collections.emptyList(),
+ testUnderReplicationWithMissingIndexes(emptyList(),
availableReplicas, 0, 1, policy);
- Assertions.assertEquals(0, cmds.size());
+ assertEquals(0, cmds.size());
// Change the remaining redundancy to ensure something needs copied.
remainingMaintenanceRedundancy = 2;
- cmds = testUnderReplicationWithMissingIndexes(Collections.emptyList(),
+ cmds = testUnderReplicationWithMissingIndexes(emptyList(),
availableReplicas, 0, 1, policy);
- Assertions.assertEquals(1, cmds.size());
+ assertEquals(1, cmds.size());
// With push replication the command should always be sent to the
// entering_maintenance source.
DatanodeDetails target = cmds.iterator().next().getKey();
- Assertions.assertEquals(maintReplica.getDatanodeDetails(), target);
+ assertEquals(maintReplica.getDatanodeDetails(), target);
}
public Set<Pair<DatanodeDetails, SCMCommand<?>>>
@@ -1032,10 +1127,10 @@ public class TestECUnderReplicationHandler {
ECUnderReplicationHandler ecURH =
new ECUnderReplicationHandler(
placementPolicy, conf, replicationManager);
- ContainerHealthResult.UnderReplicatedHealthResult result =
- Mockito.mock(ContainerHealthResult.UnderReplicatedHealthResult.class);
- Mockito.when(result.isUnrecoverable()).thenReturn(false);
- Mockito.when(result.getContainerInfo()).thenReturn(container);
+ UnderReplicatedHealthResult result =
+ mock(UnderReplicatedHealthResult.class);
+ when(result.isUnrecoverable()).thenReturn(false);
+ when(result.getContainerInfo()).thenReturn(container);
ecURH.processAndSendCommands(availableReplicas, ImmutableList.of(),
result, remainingMaintenanceRedundancy);
@@ -1054,7 +1149,7 @@ public class TestECUnderReplicationHandler {
} else if (dnCommand
.getValue() instanceof ReconstructECContainersCommand) {
if (shouldReconstructCommandExist) {
- Assertions.assertArrayEquals(missingIndexesByteArr,
+ assertArrayEquals(missingIndexesByteArr,
((ReconstructECContainersCommand) dnCommand.getValue())
.getMissingContainerIndexes());
}
@@ -1064,9 +1159,9 @@ public class TestECUnderReplicationHandler {
int maxMaintenance = PARITY - remainingMaintenanceRedundancy;
int expectedMaintenanceCommands = Math.max(0,
maintenanceIndexes - maxMaintenance);
- Assertions.assertEquals(decomIndexes + expectedMaintenanceCommands,
+ assertEquals(decomIndexes + expectedMaintenanceCommands,
replicateCommand);
- Assertions.assertEquals(shouldReconstructCommandExist ? 1 : 0,
+ assertEquals(shouldReconstructCommandExist ? 1 : 0,
reconstructCommand);
return commandsSent;
}
@@ -1078,4 +1173,53 @@ public class TestECUnderReplicationHandler {
deleteCommand.setReplicaIndex(replicaIndex);
return deleteCommand;
}
+
+ /**
+ * Helper to mock and verify calls to
+ * {@link PlacementPolicy#chooseDatanodes(List, List, int, long, long)}.
+ */
+ private static class PlacementPolicySpy {
+
+ private final List<List<DatanodeDetails>> excludedNodesLists =
+ new ArrayList<>();
+ private final List<List<DatanodeDetails>> usedNodesLists =
+ new ArrayList<>();
+
+ PlacementPolicySpy(PlacementPolicy placementPolicy, int totalNodes)
+ throws IOException {
+ when(placementPolicy.chooseDatanodes(any(), any(),
+ any(), anyInt(), anyLong(), anyLong())
+ ).thenAnswer(invocation -> {
+ final Collection<DatanodeDetails> used = invocation.getArgument(0);
+ final Collection<DatanodeDetails> excluded = invocation.getArgument(1);
+ final int nodesRequired = invocation.getArgument(3);
+
+ final int availableNodes = totalNodes - excluded.size() - used.size();
+ usedNodesLists.add(new ArrayList<>(used));
+ excludedNodesLists.add(new ArrayList<>(excluded));
+
+ final List<DatanodeDetails> targets = new ArrayList<>();
+ for (int i = 0; i < Math.min(nodesRequired, availableNodes); i++) {
+ targets.add(MockDatanodeDetails.randomDatanodeDetails());
+ }
+ if (targets.isEmpty()) {
+ throw new SCMException("not enough nodes",
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ }
+ return targets;
+ });
+ }
+
+ int callCount() {
+ return usedNodesLists.size();
+ }
+
+ List<DatanodeDetails> usedNodes(int i) {
+ return usedNodesLists.get(i);
+ }
+
+ List<DatanodeDetails> excludedNodes(int i) {
+ return excludedNodesLists.get(i);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]