This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 52db80f920 HDDS-8335. ReplicationManager: EC Mis and Under replication 
handlers should handle overloaded exceptions (#4593)
52db80f920 is described below

commit 52db80f920dcb11eb231e1cc98c909f3b0638b25
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Thu Apr 20 13:23:58 2023 +0100

    HDDS-8335. ReplicationManager: EC Mis and Under replication handlers should 
handle overloaded exceptions (#4593)
---
 .../replication/ECMisReplicationHandler.java       | 35 +++++----
 .../replication/ECUnderReplicationHandler.java     | 53 +++++++++++---
 .../container/replication/ReplicationTestUtil.java | 21 +++++-
 .../replication/TestECMisReplicationHandler.java   | 12 ++++
 .../replication/TestECUnderReplicationHandler.java | 82 +++++++++++++++++++++-
 .../replication/TestMisReplicationHandler.java     |  8 ++-
 .../TestRatisUnderReplicationHandler.java          |  3 +-
 7 files changed, 186 insertions(+), 28 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java
index 79228bfb67..bc77a6f6a8 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java
@@ -65,6 +65,7 @@ public class ECMisReplicationHandler extends 
MisReplicationHandler {
     ReplicationManager replicationManager = getReplicationManager();
     int commandsSent = 0;
     int datanodeIdx = 0;
+    CommandTargetOverloadedException overloadedException = null;
     for (ContainerReplica replica : replicasToBeReplicated) {
       if (datanodeIdx == targetDns.size()) {
         break;
@@ -72,21 +73,31 @@ public class ECMisReplicationHandler extends 
MisReplicationHandler {
       long containerID = containerInfo.getContainerID();
       DatanodeDetails source = replica.getDatanodeDetails();
       DatanodeDetails target = targetDns.get(datanodeIdx);
-      if (replicationManager.getConfig().isPush()) {
-        replicationManager.sendThrottledReplicationCommand(containerInfo,
-            Collections.singletonList(source), target,
-            replica.getReplicaIndex());
-      } else {
-        ReplicateContainerCommand cmd = ReplicateContainerCommand
-            .fromSources(containerID, Collections.singletonList(source));
-        // For EC containers, we need to track the replica index which is
-        // to be replicated, so add it to the command.
-        cmd.setReplicaIndex(replica.getReplicaIndex());
-        replicationManager.sendDatanodeCommand(cmd, containerInfo, target);
+      try {
+        if (replicationManager.getConfig().isPush()) {
+          replicationManager.sendThrottledReplicationCommand(containerInfo,
+              Collections.singletonList(source), target,
+              replica.getReplicaIndex());
+        } else {
+          ReplicateContainerCommand cmd = ReplicateContainerCommand
+              .fromSources(containerID, Collections.singletonList(source));
+          // For EC containers, we need to track the replica index which is
+          // to be replicated, so add it to the command.
+          cmd.setReplicaIndex(replica.getReplicaIndex());
+          replicationManager.sendDatanodeCommand(cmd, containerInfo, target);
+        }
+        commandsSent++;
+      } catch (CommandTargetOverloadedException e) {
+        LOG.debug("Unable to replicate container {} and index {} from {} to {}"
+                + " because the source is overloaded",
+            containerID, replica.getReplicaIndex(), source, target);
+        overloadedException = e;
       }
-      commandsSent++;
       datanodeIdx += 1;
     }
+    if (overloadedException != null) {
+      throw overloadedException;
+    }
     return commandsSent;
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index f07b1c5a6f..e54102b2e6 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -159,17 +159,19 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
               .collect(Collectors.toList());
 
       try {
-        InsufficientDatanodesException firstException = null;
+        IOException firstException = null;
         try {
           commandsSent += processMissingIndexes(replicaCount, sources,
               availableSourceNodes, excludedNodes);
-        } catch (InsufficientDatanodesException e) {
+        } catch (InsufficientDatanodesException
+            | CommandTargetOverloadedException  e) {
           firstException = e;
         }
         try {
           commandsSent += processDecommissioningIndexes(replicaCount, sources,
               availableSourceNodes, excludedNodes);
-        } catch (InsufficientDatanodesException e) {
+        } catch (InsufficientDatanodesException
+            | CommandTargetOverloadedException e) {
           if (firstException == null) {
             firstException = e;
           }
@@ -177,7 +179,8 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
         try {
           commandsSent += processMaintenanceOnlyIndexes(replicaCount, sources,
               excludedNodes);
-        } catch (InsufficientDatanodesException e) {
+        } catch (InsufficientDatanodesException
+            | CommandTargetOverloadedException e) {
           if (firstException == null) {
             firstException = e;
           }
@@ -313,6 +316,11 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
                 sourceDatanodesWithIndex, selectedDatanodes,
                 int2byte(missingIndexes),
                 repConfig);
+        // This can throw a CommandTargetOverloadedException, but there is no
+        // point in retrying here. The sources we picked already have the
+        // overloaded nodes excluded, so we should not get an overloaded
+        // exception, but it could happen due to other threads adding work to
+        // the DNs. If it happens here, we just let the exception bubble up.
         replicationManager.sendThrottledReconstructionCommand(
             container, reconstructionCommand);
         for (int i = 0; i < missingIndexes.size(); i++) {
@@ -363,6 +371,7 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
         excludedNodes.addAll(selectedDatanodes);
         Iterator<DatanodeDetails> iterator = selectedDatanodes.iterator();
         // In this case we need to do one to one copy.
+        CommandTargetOverloadedException overloadedException = null;
         for (Integer decomIndex : decomIndexes) {
           Pair<ContainerReplica, NodeStatus> source = sources.get(decomIndex);
           if (source == null) {
@@ -380,9 +389,20 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
                 selectedDatanodes, excludedNodes, decomIndexes);
             break;
           }
-          createReplicateCommand(
-              container, iterator, sourceReplica, replicaCount);
-          commandsSent++;
+          try {
+            createReplicateCommand(
+                container, iterator, sourceReplica, replicaCount);
+            commandsSent++;
+          } catch (CommandTargetOverloadedException e) {
+            LOG.debug("Unable to send Replicate command for container {}" +
+                " index {} because the source node {} is overloaded.",
+                container.getContainerID(), sourceReplica.getReplicaIndex(),
+                sourceReplica.getDatanodeDetails());
+            overloadedException = e;
+          }
+        }
+        if (overloadedException != null) {
+          throw overloadedException;
         }
       }
       if (selectedDatanodes.size() != decomIndexes.size()) {
@@ -432,6 +452,7 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
     int commandsSent = 0;
     // copy replica from source maintenance DN to a target DN
 
+    CommandTargetOverloadedException overloadedException = null;
     for (Integer maintIndex : maintIndexes) {
       if (additionalMaintenanceCopiesNeeded <= 0) {
         break;
@@ -452,9 +473,21 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
             targets, excludedNodes, maintIndexes);
         break;
       }
-      createReplicateCommand(container, iterator, sourceReplica, replicaCount);
-      commandsSent++;
-      additionalMaintenanceCopiesNeeded -= 1;
+      try {
+        createReplicateCommand(
+            container, iterator, sourceReplica, replicaCount);
+        commandsSent++;
+        additionalMaintenanceCopiesNeeded -= 1;
+      } catch (CommandTargetOverloadedException e) {
+        LOG.debug("Unable to send Replicate command for container {}" +
+            " index {} because the source node {} is overloaded.",
+            container.getContainerID(), sourceReplica.getReplicaIndex(),
+            sourceReplica.getDatanodeDetails());
+        overloadedException = e;
+      }
+    }
+    if (overloadedException != null) {
+      throw overloadedException;
     }
     if (targets.size() != maintIndexes.size()) {
       LOG.debug("Insufficient nodes were returned from the placement policy" +
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
index bdfb89c53c..9d520d69ac 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
@@ -47,6 +47,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
@@ -355,13 +356,21 @@ public final class ReplicationTestUtil {
    * to the commandsSent set.
    * @param mock Mock of ReplicationManager
    * @param commandsSent Set to add the command to rather than sending it.
+   * @param throwOverloaded If the atomic boolean is true, throw a
+   *                        CommandTargetOverloadedException and set the 
boolean
+   *                        to false, instead of creating the replicate 
command.
    * @throws NotLeaderException
    * @throws CommandTargetOverloadedException
    */
   public static void mockRMSendThrottleReplicateCommand(ReplicationManager 
mock,
-      Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent)
+      Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent,
+      AtomicBoolean throwOverloaded)
       throws NotLeaderException, CommandTargetOverloadedException {
     doAnswer((Answer<Void>) invocationOnMock -> {
+      if (throwOverloaded.get()) {
+        throwOverloaded.set(false);
+        throw new CommandTargetOverloadedException("Overloaded");
+      }
       List<DatanodeDetails> sources = invocationOnMock.getArgument(1);
       ContainerInfo containerInfo = invocationOnMock.getArgument(0);
       ReplicateContainerCommand command = ReplicateContainerCommand
@@ -381,14 +390,22 @@ public final class ReplicationTestUtil {
    * created to the commandsSent set.
    * @param mock Mock of ReplicationManager
    * @param commandsSent Set to add the command to rather than sending it.
+   * @param throwOverloaded If the atomic boolean is true, throw a
+   *                        CommandTargetOverloadedException and set the 
boolean
+   *                        to false, instead of creating the replicate 
command.
    * @throws NotLeaderException
    * @throws CommandTargetOverloadedException
    */
   public static void mockSendThrottledReconstructionCommand(
       ReplicationManager mock,
-      Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent)
+      Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent,
+      AtomicBoolean throwOverloaded)
       throws NotLeaderException, CommandTargetOverloadedException {
     doAnswer((Answer<Void>) invocationOnMock -> {
+      if (throwOverloaded.get()) {
+        throwOverloaded.set(false);
+        throw new CommandTargetOverloadedException("Overloaded");
+      }
       ReconstructECContainersCommand cmd = invocationOnMock.getArgument(1);
       commandsSent.add(Pair.of(cmd.getTargetDatanodes().get(0), cmd));
       return null;
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
index f8d119ab87..1bf271fd11 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
@@ -188,6 +188,18 @@ public class TestECMisReplicationHandler extends 
TestMisReplicationHandler {
             Collections.emptyList(), 0, 1, 1, 0));
   }
 
+  @Test
+  public void testFirstSourcesOverloaded() {
+    setThrowThrottledException(true);
+    Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+        .createReplicas(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+            Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
+            Pair.of(IN_SERVICE, 5));
+    assertThrows(CommandTargetOverloadedException.class,
+        () -> testMisReplication(availableReplicas, mockPlacementPolicy(),
+            Collections.emptyList(), 0, 2, 2, 1));
+  }
+
   @Test
   public void commandsForFewerThanRequiredNodes() throws IOException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index 09f10bdf5f..af486d257d 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -60,6 +60,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.util.Collections.singleton;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
@@ -94,6 +95,10 @@ public class TestECUnderReplicationHandler {
   private PlacementPolicy ecPlacementPolicy;
   private int remainingMaintenanceRedundancy = 1;
   private Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent;
+  private AtomicBoolean throwOverloadedExceptionOnReplication
+      = new AtomicBoolean(false);
+  private AtomicBoolean throwOverloadedExceptionOnReconstruction
+      = new AtomicBoolean(false);
 
   @BeforeEach
   public void setup() throws NodeNotFoundException,
@@ -122,9 +127,11 @@ public class TestECUnderReplicationHandler {
     ReplicationTestUtil.mockRMSendDatanodeCommand(
         replicationManager, commandsSent);
     ReplicationTestUtil.mockRMSendThrottleReplicateCommand(
-        replicationManager, commandsSent);
+        replicationManager, commandsSent,
+        throwOverloadedExceptionOnReplication);
     ReplicationTestUtil.mockSendThrottledReconstructionCommand(
-        replicationManager, commandsSent);
+        replicationManager, commandsSent,
+        throwOverloadedExceptionOnReconstruction);
 
     conf = SCMTestUtils.getConf();
     repConfig = new ECReplicationConfig(DATA, PARITY);
@@ -494,6 +501,30 @@ public class TestECUnderReplicationHandler {
     Assertions.assertEquals(1, cmd.getTargetDatanodes().size());
   }
 
+  @Test
+  public void testOverloadedReconstructionContinuesNextStages() {
+    Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+        .createReplicas(Pair.of(IN_SERVICE, 1),
+            Pair.of(IN_SERVICE, 2), Pair.of(DECOMMISSIONING, 3));
+    ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
+        policy, conf, replicationManager);
+
+    ContainerHealthResult.UnderReplicatedHealthResult underRep =
+        new ContainerHealthResult.UnderReplicatedHealthResult(container,
+            0, false, false, false);
+
+    // Setup so reconstruction fails, but we should still get a replicate
+    // command for the decommissioning node and an exception thrown.
+    throwOverloadedExceptionOnReconstruction.set(true);
+    assertThrows(CommandTargetOverloadedException.class, () ->
+        ecURH.processAndSendCommands(availableReplicas, 
Collections.emptyList(),
+            underRep, 1));
+    Assertions.assertEquals(1, commandsSent.size());
+    SCMCommand<?> cmd = commandsSent.iterator().next().getValue();
+    Assertions.assertEquals(
+        SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
+  }
+
   @Test
   public void testPartialDecommissionIfNotEnoughNodes() {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
@@ -518,6 +549,29 @@ public class TestECUnderReplicationHandler {
         SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
   }
 
+  @Test
+  public void testPartialDecommissionOverloadedNodes() {
+    Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+        .createReplicas(Pair.of(IN_SERVICE, 1),
+            Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
+            Pair.of(DECOMMISSIONING, 4), Pair.of(DECOMMISSIONING, 5));
+    ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
+        policy, conf, replicationManager);
+
+    ContainerHealthResult.UnderReplicatedHealthResult underRep =
+        new ContainerHealthResult.UnderReplicatedHealthResult(container,
+            0, true, false, false);
+
+    throwOverloadedExceptionOnReplication.set(true);
+    assertThrows(CommandTargetOverloadedException.class, () ->
+        ecURH.processAndSendCommands(availableReplicas, 
Collections.emptyList(),
+            underRep, 1));
+    Assertions.assertEquals(1, commandsSent.size());
+    SCMCommand<?> cmd = commandsSent.iterator().next().getValue();
+    Assertions.assertEquals(
+        SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
+  }
+
   @Test
   public void testPartialMaintenanceIfNotEnoughNodes() {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
@@ -543,6 +597,30 @@ public class TestECUnderReplicationHandler {
         SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
   }
 
+  @Test
+  public void testPartialMaintenanceOverloadedNodes() {
+    Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+        .createReplicas(Pair.of(IN_SERVICE, 1),
+            Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
+            Pair.of(ENTERING_MAINTENANCE, 4),
+            Pair.of(ENTERING_MAINTENANCE, 5));
+    ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
+        policy, conf, replicationManager);
+
+    ContainerHealthResult.UnderReplicatedHealthResult underRep =
+        new ContainerHealthResult.UnderReplicatedHealthResult(container,
+            0, false, false, false);
+
+    throwOverloadedExceptionOnReplication.set(true);
+    assertThrows(CommandTargetOverloadedException.class, () ->
+        ecURH.processAndSendCommands(availableReplicas, 
Collections.emptyList(),
+            underRep, 2));
+    Assertions.assertEquals(1, commandsSent.size());
+    SCMCommand<?> cmd = commandsSent.iterator().next().getValue();
+    Assertions.assertEquals(
+        SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
+  }
+
   @Test
   public void testUnderRepWithDecommissionAndNotEnoughNodes()
       throws IOException {
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
index e36c7d88ce..d5f50c23f6 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
@@ -45,6 +45,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -69,6 +70,7 @@ public abstract class TestMisReplicationHandler {
   private OzoneConfiguration conf;
   private ReplicationManager replicationManager;
   private Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent;
+  private AtomicBoolean throwThrottledException = new AtomicBoolean(false);
 
   protected void setup(ReplicationConfig repConfig)
       throws NodeNotFoundException, CommandTargetOverloadedException,
@@ -91,7 +93,7 @@ public abstract class TestMisReplicationHandler {
     ReplicationTestUtil.mockRMSendDatanodeCommand(
         replicationManager, commandsSent);
     ReplicationTestUtil.mockRMSendThrottleReplicateCommand(
-        replicationManager, commandsSent);
+        replicationManager, commandsSent, throwThrottledException);
 
     container = ReplicationTestUtil
             .createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
@@ -104,6 +106,10 @@ public abstract class TestMisReplicationHandler {
     return replicationManager;
   }
 
+  protected void setThrowThrottledException(boolean showThrow) {
+    throwThrottledException.set(showThrow);
+  }
+
   static PlacementPolicy<?> mockPlacementPolicy() {
     PlacementPolicy<?> placementPolicy = Mockito.mock(PlacementPolicy.class);
     ContainerPlacementStatus mockedContainerPlacementStatus =
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
index a2f9bdcfd8..299e0e16e0 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
@@ -48,6 +48,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE;
@@ -96,7 +97,7 @@ public class TestRatisUnderReplicationHandler {
 
     commandsSent = new HashSet<>();
     ReplicationTestUtil.mockRMSendThrottleReplicateCommand(
-        replicationManager, commandsSent);
+        replicationManager, commandsSent, new AtomicBoolean(false));
     ReplicationTestUtil.mockRMSendDatanodeCommand(replicationManager,
         commandsSent);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to