This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 52db80f920 HDDS-8335. ReplicationManager: EC Mis and Under replication
handlers should handle overloaded exceptions (#4593)
52db80f920 is described below
commit 52db80f920dcb11eb231e1cc98c909f3b0638b25
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Thu Apr 20 13:23:58 2023 +0100
HDDS-8335. ReplicationManager: EC Mis and Under replication handlers should
handle overloaded exceptions (#4593)
---
.../replication/ECMisReplicationHandler.java | 35 +++++----
.../replication/ECUnderReplicationHandler.java | 53 +++++++++++---
.../container/replication/ReplicationTestUtil.java | 21 +++++-
.../replication/TestECMisReplicationHandler.java | 12 ++++
.../replication/TestECUnderReplicationHandler.java | 82 +++++++++++++++++++++-
.../replication/TestMisReplicationHandler.java | 8 ++-
.../TestRatisUnderReplicationHandler.java | 3 +-
7 files changed, 186 insertions(+), 28 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java
index 79228bfb67..bc77a6f6a8 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java
@@ -65,6 +65,7 @@ public class ECMisReplicationHandler extends
MisReplicationHandler {
ReplicationManager replicationManager = getReplicationManager();
int commandsSent = 0;
int datanodeIdx = 0;
+ CommandTargetOverloadedException overloadedException = null;
for (ContainerReplica replica : replicasToBeReplicated) {
if (datanodeIdx == targetDns.size()) {
break;
@@ -72,21 +73,31 @@ public class ECMisReplicationHandler extends
MisReplicationHandler {
long containerID = containerInfo.getContainerID();
DatanodeDetails source = replica.getDatanodeDetails();
DatanodeDetails target = targetDns.get(datanodeIdx);
- if (replicationManager.getConfig().isPush()) {
- replicationManager.sendThrottledReplicationCommand(containerInfo,
- Collections.singletonList(source), target,
- replica.getReplicaIndex());
- } else {
- ReplicateContainerCommand cmd = ReplicateContainerCommand
- .fromSources(containerID, Collections.singletonList(source));
- // For EC containers, we need to track the replica index which is
- // to be replicated, so add it to the command.
- cmd.setReplicaIndex(replica.getReplicaIndex());
- replicationManager.sendDatanodeCommand(cmd, containerInfo, target);
+ try {
+ if (replicationManager.getConfig().isPush()) {
+ replicationManager.sendThrottledReplicationCommand(containerInfo,
+ Collections.singletonList(source), target,
+ replica.getReplicaIndex());
+ } else {
+ ReplicateContainerCommand cmd = ReplicateContainerCommand
+ .fromSources(containerID, Collections.singletonList(source));
+ // For EC containers, we need to track the replica index which is
+ // to be replicated, so add it to the command.
+ cmd.setReplicaIndex(replica.getReplicaIndex());
+ replicationManager.sendDatanodeCommand(cmd, containerInfo, target);
+ }
+ commandsSent++;
+ } catch (CommandTargetOverloadedException e) {
+ LOG.debug("Unable to replicate container {} and index {} from {} to {}"
+ + " because the source is overloaded",
+ containerID, replica.getReplicaIndex(), source, target);
+ overloadedException = e;
}
- commandsSent++;
datanodeIdx += 1;
}
+ if (overloadedException != null) {
+ throw overloadedException;
+ }
return commandsSent;
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index f07b1c5a6f..e54102b2e6 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -159,17 +159,19 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
.collect(Collectors.toList());
try {
- InsufficientDatanodesException firstException = null;
+ IOException firstException = null;
try {
commandsSent += processMissingIndexes(replicaCount, sources,
availableSourceNodes, excludedNodes);
- } catch (InsufficientDatanodesException e) {
+ } catch (InsufficientDatanodesException
+ | CommandTargetOverloadedException e) {
firstException = e;
}
try {
commandsSent += processDecommissioningIndexes(replicaCount, sources,
availableSourceNodes, excludedNodes);
- } catch (InsufficientDatanodesException e) {
+ } catch (InsufficientDatanodesException
+ | CommandTargetOverloadedException e) {
if (firstException == null) {
firstException = e;
}
@@ -177,7 +179,8 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
try {
commandsSent += processMaintenanceOnlyIndexes(replicaCount, sources,
excludedNodes);
- } catch (InsufficientDatanodesException e) {
+ } catch (InsufficientDatanodesException
+ | CommandTargetOverloadedException e) {
if (firstException == null) {
firstException = e;
}
@@ -313,6 +316,11 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
sourceDatanodesWithIndex, selectedDatanodes,
int2byte(missingIndexes),
repConfig);
+ // This can throw a CommandTargetOverloadedException, but there is no
+ // point in retrying here. The sources we picked already have the
+ // overloaded nodes excluded, so we should not get an overloaded
+ // exception, but it could happen due to other threads adding work to
+ // the DNs. If it happens here, we just let the exception bubble up.
replicationManager.sendThrottledReconstructionCommand(
container, reconstructionCommand);
for (int i = 0; i < missingIndexes.size(); i++) {
@@ -363,6 +371,7 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
excludedNodes.addAll(selectedDatanodes);
Iterator<DatanodeDetails> iterator = selectedDatanodes.iterator();
// In this case we need to do one to one copy.
+ CommandTargetOverloadedException overloadedException = null;
for (Integer decomIndex : decomIndexes) {
Pair<ContainerReplica, NodeStatus> source = sources.get(decomIndex);
if (source == null) {
@@ -380,9 +389,20 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
selectedDatanodes, excludedNodes, decomIndexes);
break;
}
- createReplicateCommand(
- container, iterator, sourceReplica, replicaCount);
- commandsSent++;
+ try {
+ createReplicateCommand(
+ container, iterator, sourceReplica, replicaCount);
+ commandsSent++;
+ } catch (CommandTargetOverloadedException e) {
+ LOG.debug("Unable to send Replicate command for container {}" +
+ " index {} because the source node {} is overloaded.",
+ container.getContainerID(), sourceReplica.getReplicaIndex(),
+ sourceReplica.getDatanodeDetails());
+ overloadedException = e;
+ }
+ }
+ if (overloadedException != null) {
+ throw overloadedException;
}
}
if (selectedDatanodes.size() != decomIndexes.size()) {
@@ -432,6 +452,7 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
int commandsSent = 0;
// copy replica from source maintenance DN to a target DN
+ CommandTargetOverloadedException overloadedException = null;
for (Integer maintIndex : maintIndexes) {
if (additionalMaintenanceCopiesNeeded <= 0) {
break;
@@ -452,9 +473,21 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
targets, excludedNodes, maintIndexes);
break;
}
- createReplicateCommand(container, iterator, sourceReplica, replicaCount);
- commandsSent++;
- additionalMaintenanceCopiesNeeded -= 1;
+ try {
+ createReplicateCommand(
+ container, iterator, sourceReplica, replicaCount);
+ commandsSent++;
+ additionalMaintenanceCopiesNeeded -= 1;
+ } catch (CommandTargetOverloadedException e) {
+ LOG.debug("Unable to send Replicate command for container {}" +
+ " index {} because the source node {} is overloaded.",
+ container.getContainerID(), sourceReplica.getReplicaIndex(),
+ sourceReplica.getDatanodeDetails());
+ overloadedException = e;
+ }
+ }
+ if (overloadedException != null) {
+ throw overloadedException;
}
if (targets.size() != maintIndexes.size()) {
LOG.debug("Insufficient nodes were returned from the placement policy" +
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
index bdfb89c53c..9d520d69ac 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
@@ -47,6 +47,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
@@ -355,13 +356,21 @@ public final class ReplicationTestUtil {
* to the commandsSent set.
* @param mock Mock of ReplicationManager
* @param commandsSent Set to add the command to rather than sending it.
+ * @param throwOverloaded If the atomic boolean is true, throw a
+ * CommandTargetOverloadedException and set the
boolean
+ * to false, instead of creating the replicate
command.
* @throws NotLeaderException
* @throws CommandTargetOverloadedException
*/
public static void mockRMSendThrottleReplicateCommand(ReplicationManager
mock,
- Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent)
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent,
+ AtomicBoolean throwOverloaded)
throws NotLeaderException, CommandTargetOverloadedException {
doAnswer((Answer<Void>) invocationOnMock -> {
+ if (throwOverloaded.get()) {
+ throwOverloaded.set(false);
+ throw new CommandTargetOverloadedException("Overloaded");
+ }
List<DatanodeDetails> sources = invocationOnMock.getArgument(1);
ContainerInfo containerInfo = invocationOnMock.getArgument(0);
ReplicateContainerCommand command = ReplicateContainerCommand
@@ -381,14 +390,22 @@ public final class ReplicationTestUtil {
* created to the commandsSent set.
* @param mock Mock of ReplicationManager
* @param commandsSent Set to add the command to rather than sending it.
+ * @param throwOverloaded If the atomic boolean is true, throw a
+ * CommandTargetOverloadedException and set the
boolean
+ * to false, instead of creating the replicate
command.
* @throws NotLeaderException
* @throws CommandTargetOverloadedException
*/
public static void mockSendThrottledReconstructionCommand(
ReplicationManager mock,
- Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent)
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent,
+ AtomicBoolean throwOverloaded)
throws NotLeaderException, CommandTargetOverloadedException {
doAnswer((Answer<Void>) invocationOnMock -> {
+ if (throwOverloaded.get()) {
+ throwOverloaded.set(false);
+ throw new CommandTargetOverloadedException("Overloaded");
+ }
ReconstructECContainersCommand cmd = invocationOnMock.getArgument(1);
commandsSent.add(Pair.of(cmd.getTargetDatanodes().get(0), cmd));
return null;
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
index f8d119ab87..1bf271fd11 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
@@ -188,6 +188,18 @@ public class TestECMisReplicationHandler extends
TestMisReplicationHandler {
Collections.emptyList(), 0, 1, 1, 0));
}
+ @Test
+ public void testFirstSourcesOverloaded() {
+ setThrowThrottledException(true);
+ Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+ .createReplicas(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+ Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
+ Pair.of(IN_SERVICE, 5));
+ assertThrows(CommandTargetOverloadedException.class,
+ () -> testMisReplication(availableReplicas, mockPlacementPolicy(),
+ Collections.emptyList(), 0, 2, 2, 1));
+ }
+
@Test
public void commandsForFewerThanRequiredNodes() throws IOException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index 09f10bdf5f..af486d257d 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -60,6 +60,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Collections.singleton;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
@@ -94,6 +95,10 @@ public class TestECUnderReplicationHandler {
private PlacementPolicy ecPlacementPolicy;
private int remainingMaintenanceRedundancy = 1;
private Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent;
+ private AtomicBoolean throwOverloadedExceptionOnReplication
+ = new AtomicBoolean(false);
+ private AtomicBoolean throwOverloadedExceptionOnReconstruction
+ = new AtomicBoolean(false);
@BeforeEach
public void setup() throws NodeNotFoundException,
@@ -122,9 +127,11 @@ public class TestECUnderReplicationHandler {
ReplicationTestUtil.mockRMSendDatanodeCommand(
replicationManager, commandsSent);
ReplicationTestUtil.mockRMSendThrottleReplicateCommand(
- replicationManager, commandsSent);
+ replicationManager, commandsSent,
+ throwOverloadedExceptionOnReplication);
ReplicationTestUtil.mockSendThrottledReconstructionCommand(
- replicationManager, commandsSent);
+ replicationManager, commandsSent,
+ throwOverloadedExceptionOnReconstruction);
conf = SCMTestUtils.getConf();
repConfig = new ECReplicationConfig(DATA, PARITY);
@@ -494,6 +501,30 @@ public class TestECUnderReplicationHandler {
Assertions.assertEquals(1, cmd.getTargetDatanodes().size());
}
+ @Test
+ public void testOverloadedReconstructionContinuesNextStages() {
+ Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+ .createReplicas(Pair.of(IN_SERVICE, 1),
+ Pair.of(IN_SERVICE, 2), Pair.of(DECOMMISSIONING, 3));
+ ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
+ policy, conf, replicationManager);
+
+ ContainerHealthResult.UnderReplicatedHealthResult underRep =
+ new ContainerHealthResult.UnderReplicatedHealthResult(container,
+ 0, false, false, false);
+
+ // Setup so reconstruction fails, but we should still get a replicate
+ // command for the decommissioning node and an exception thrown.
+ throwOverloadedExceptionOnReconstruction.set(true);
+ assertThrows(CommandTargetOverloadedException.class, () ->
+ ecURH.processAndSendCommands(availableReplicas,
Collections.emptyList(),
+ underRep, 1));
+ Assertions.assertEquals(1, commandsSent.size());
+ SCMCommand<?> cmd = commandsSent.iterator().next().getValue();
+ Assertions.assertEquals(
+ SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
+ }
+
@Test
public void testPartialDecommissionIfNotEnoughNodes() {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
@@ -518,6 +549,29 @@ public class TestECUnderReplicationHandler {
SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
}
+ @Test
+ public void testPartialDecommissionOverloadedNodes() {
+ Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+ .createReplicas(Pair.of(IN_SERVICE, 1),
+ Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
+ Pair.of(DECOMMISSIONING, 4), Pair.of(DECOMMISSIONING, 5));
+ ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
+ policy, conf, replicationManager);
+
+ ContainerHealthResult.UnderReplicatedHealthResult underRep =
+ new ContainerHealthResult.UnderReplicatedHealthResult(container,
+ 0, true, false, false);
+
+ throwOverloadedExceptionOnReplication.set(true);
+ assertThrows(CommandTargetOverloadedException.class, () ->
+ ecURH.processAndSendCommands(availableReplicas,
Collections.emptyList(),
+ underRep, 1));
+ Assertions.assertEquals(1, commandsSent.size());
+ SCMCommand<?> cmd = commandsSent.iterator().next().getValue();
+ Assertions.assertEquals(
+ SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
+ }
+
@Test
public void testPartialMaintenanceIfNotEnoughNodes() {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
@@ -543,6 +597,30 @@ public class TestECUnderReplicationHandler {
SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
}
+ @Test
+ public void testPartialMaintenanceOverloadedNodes() {
+ Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+ .createReplicas(Pair.of(IN_SERVICE, 1),
+ Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
+ Pair.of(ENTERING_MAINTENANCE, 4),
+ Pair.of(ENTERING_MAINTENANCE, 5));
+ ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
+ policy, conf, replicationManager);
+
+ ContainerHealthResult.UnderReplicatedHealthResult underRep =
+ new ContainerHealthResult.UnderReplicatedHealthResult(container,
+ 0, false, false, false);
+
+ throwOverloadedExceptionOnReplication.set(true);
+ assertThrows(CommandTargetOverloadedException.class, () ->
+ ecURH.processAndSendCommands(availableReplicas,
Collections.emptyList(),
+ underRep, 2));
+ Assertions.assertEquals(1, commandsSent.size());
+ SCMCommand<?> cmd = commandsSent.iterator().next().getValue();
+ Assertions.assertEquals(
+ SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
+ }
+
@Test
public void testUnderRepWithDecommissionAndNotEnoughNodes()
throws IOException {
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
index e36c7d88ce..d5f50c23f6 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
@@ -45,6 +45,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -69,6 +70,7 @@ public abstract class TestMisReplicationHandler {
private OzoneConfiguration conf;
private ReplicationManager replicationManager;
private Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent;
+ private AtomicBoolean throwThrottledException = new AtomicBoolean(false);
protected void setup(ReplicationConfig repConfig)
throws NodeNotFoundException, CommandTargetOverloadedException,
@@ -91,7 +93,7 @@ public abstract class TestMisReplicationHandler {
ReplicationTestUtil.mockRMSendDatanodeCommand(
replicationManager, commandsSent);
ReplicationTestUtil.mockRMSendThrottleReplicateCommand(
- replicationManager, commandsSent);
+ replicationManager, commandsSent, throwThrottledException);
container = ReplicationTestUtil
.createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
@@ -104,6 +106,10 @@ public abstract class TestMisReplicationHandler {
return replicationManager;
}
+ protected void setThrowThrottledException(boolean showThrow) {
+ throwThrottledException.set(showThrow);
+ }
+
static PlacementPolicy<?> mockPlacementPolicy() {
PlacementPolicy<?> placementPolicy = Mockito.mock(PlacementPolicy.class);
ContainerPlacementStatus mockedContainerPlacementStatus =
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
index a2f9bdcfd8..299e0e16e0 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
@@ -48,6 +48,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE;
@@ -96,7 +97,7 @@ public class TestRatisUnderReplicationHandler {
commandsSent = new HashSet<>();
ReplicationTestUtil.mockRMSendThrottleReplicateCommand(
- replicationManager, commandsSent);
+ replicationManager, commandsSent, new AtomicBoolean(false));
ReplicationTestUtil.mockRMSendDatanodeCommand(replicationManager,
commandsSent);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]