This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new b588694  HDDS-5459. Replication Manager should process containers 
synchronously for tests (#2429)
b588694 is described below

commit b588694da0a16aa2a7c1b0998e235edeeada1eae
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Wed Jul 21 03:57:19 2021 +0100

    HDDS-5459. Replication Manager should process containers synchronously for 
tests (#2429)
---
 .../hdds/scm/container/ReplicationManager.java     | 37 ++++-----
 .../hdds/scm/container/TestReplicationManager.java | 95 +++++++++++-----------
 .../hadoop/ozone/TestStorageContainerManager.java  |  2 +-
 .../commandhandler/TestBlockDeletion.java          | 12 +--
 .../TestSCMContainerPlacementPolicyMetrics.java    |  2 +-
 5 files changed, 68 insertions(+), 80 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
index 104ef28..4ab69a0 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
@@ -78,9 +78,7 @@ import 
org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.ExitUtil;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.GeneratedMessage;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
 import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
 
@@ -312,16 +310,6 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
   }
 
   /**
-   * Process all the containers immediately.
-   */
-  @VisibleForTesting
-  @SuppressFBWarnings(value="NN_NAKED_NOTIFY",
-      justification="Used only for testing")
-  public synchronized void processContainersNow() {
-    notifyAll();
-  }
-
-  /**
    * Stops Replication Monitor thread.
    */
   public synchronized void stop() {
@@ -341,21 +329,28 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
   }
 
   /**
+   * Process all the containers now, and wait for the processing to complete.
+   * This in intended to be used in tests.
+   */
+  public synchronized void processAll() {
+    final long start = clock.millis();
+    final List<ContainerInfo> containers =
+        containerManager.getContainers();
+    containers.forEach(this::processContainer);
+
+    LOG.info("Replication Monitor Thread took {} milliseconds for" +
+            " processing {} containers.", clock.millis() - start,
+        containers.size());
+  }
+
+  /**
    * ReplicationMonitor thread runnable. This wakes up at configured
    * interval and processes all the containers in the system.
    */
   private synchronized void run() {
     try {
       while (running) {
-        final long start = clock.millis();
-        final List<ContainerInfo> containers =
-            containerManager.getContainers();
-        containers.forEach(this::processContainer);
-
-        LOG.info("Replication Monitor Thread took {} milliseconds for" +
-                " processing {} containers.", clock.millis() - start,
-            containers.size());
-
+        processAll();
         wait(rmConf.getInterval());
       }
     } catch (Throwable t) {
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index d01aafc..d92e769 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -214,9 +214,9 @@ public class TestReplicationManager {
   public void testOpenContainer() throws SCMException, InterruptedException {
     final ContainerInfo container = getContainer(LifeCycleState.OPEN);
     containerStateManager.loadContainer(container);
-    replicationManager.processContainersNow();
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
-    Thread.sleep(100L);
+    eventQueue.processAll(1000);
     Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
 
   }
@@ -249,9 +249,9 @@ public class TestReplicationManager {
     final int currentCloseCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
 
-    replicationManager.processContainersNow();
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
-    Thread.sleep(100L);
+    eventQueue.processAll(1000);
     Assert.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
 
@@ -260,9 +260,9 @@ public class TestReplicationManager {
       containerStateManager.updateContainerReplica(id, replica);
     }
 
-    replicationManager.processContainersNow();
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
-    Thread.sleep(100L);
+    eventQueue.processAll(1000);
     Assert.assertEquals(currentCloseCommandCount + 6, datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
   }
@@ -295,9 +295,9 @@ public class TestReplicationManager {
     final int currentCloseCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
     // Two of the replicas are in OPEN state
-    replicationManager.processContainersNow();
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
-    Thread.sleep(100L);
+    eventQueue.processAll(1000);
     Assert.assertEquals(currentCloseCommandCount + 2, datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
     Assert.assertTrue(datanodeCommandHandler.received(
@@ -333,9 +333,9 @@ public class TestReplicationManager {
 
     // All the QUASI_CLOSED replicas have same originNodeId, so the
     // container will not be closed. ReplicationManager should take no action.
-    replicationManager.processContainersNow();
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
-    Thread.sleep(100L);
+    eventQueue.processAll(1000);
     Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
   }
 
@@ -374,9 +374,9 @@ public class TestReplicationManager {
 
     // All the QUASI_CLOSED replicas have same originNodeId, so the
     // container will not be closed. ReplicationManager should take no action.
-    replicationManager.processContainersNow();
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
-    Thread.sleep(100L);
+    eventQueue.processAll(1000);
     Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
 
     // Make the first replica unhealthy
@@ -385,9 +385,9 @@ public class TestReplicationManager {
         replicaOne.getDatanodeDetails());
     containerStateManager.updateContainerReplica(id, unhealthyReplica);
 
-    replicationManager.processContainersNow();
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
-    Thread.sleep(100L);
+    eventQueue.processAll(1000);
     Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
     Assert.assertTrue(datanodeCommandHandler.received(
@@ -398,9 +398,9 @@ public class TestReplicationManager {
     containerStateManager.removeContainerReplica(id, replicaOne);
 
     // The container is under replicated as unhealthy replica is removed
-    replicationManager.processContainersNow();
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
-    Thread.sleep(100L);
+    eventQueue.processAll(1000);
 
     // We should get replicate command
     Assert.assertEquals(currentReplicateCommandCount + 1,
@@ -436,9 +436,9 @@ public class TestReplicationManager {
     final int currentDeleteCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
 
-    replicationManager.processContainersNow();
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
-    Thread.sleep(100L);
+    eventQueue.processAll(1000);
     Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
   }
@@ -473,9 +473,9 @@ public class TestReplicationManager {
     final int currentDeleteCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
 
-    replicationManager.processContainersNow();
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
-    Thread.sleep(100L);
+    eventQueue.processAll(1000);
     Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
     Assert.assertTrue(datanodeCommandHandler.received(
@@ -505,9 +505,9 @@ public class TestReplicationManager {
     final int currentReplicateCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
 
-    replicationManager.processContainersNow();
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
-    Thread.sleep(100L);
+    eventQueue.processAll(1000);
     Assert.assertEquals(currentReplicateCommandCount + 1,
         datanodeCommandHandler.getInvocationCount(
             SCMCommandProto.Type.replicateContainerCommand));
@@ -553,7 +553,7 @@ public class TestReplicationManager {
     final int currentDeleteCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
 
-    replicationManager.processContainersNow();
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
     GenericTestUtils.waitFor(
         () -> (currentReplicateCommandCount + 1) == datanodeCommandHandler
@@ -579,9 +579,9 @@ public class TestReplicationManager {
      * iteration it should delete the unhealthy replica.
      */
 
-    replicationManager.processContainersNow();
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
-    Thread.sleep(100L);
+    eventQueue.processAll(1000);
     Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
     // ReplicaTwo should be deleted, that is the unhealthy one
@@ -597,9 +597,9 @@ public class TestReplicationManager {
      * is under replicated now
      */
 
-    replicationManager.processContainersNow();
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
-    Thread.sleep(100L);
+    eventQueue.processAll(1000);
     Assert.assertEquals(currentReplicateCommandCount + 2,
         datanodeCommandHandler.getInvocationCount(
             SCMCommandProto.Type.replicateContainerCommand));
@@ -629,9 +629,9 @@ public class TestReplicationManager {
     final int currentCloseCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
 
-    replicationManager.processContainersNow();
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
-    Thread.sleep(100L);
+    eventQueue.processAll(1000);
 
     // All the replicas have same BCSID, so all of them will be closed.
     Assert.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler
@@ -659,9 +659,9 @@ public class TestReplicationManager {
       containerStateManager.updateContainerReplica(id, replica);
     }
 
-    replicationManager.processContainersNow();
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
-    Thread.sleep(100L);
+    eventQueue.processAll(1000);
     Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
   }
 
@@ -687,10 +687,9 @@ public class TestReplicationManager {
         Mockito.mock(CloseContainerEventHandler.class);
     eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
 
-    replicationManager.processContainersNow();
-
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
-    Thread.sleep(100L);
+    eventQueue.processAll(1000);
     Mockito.verify(closeContainerHandler, Mockito.times(1))
         .onMessage(id, eventQueue);
   }
@@ -738,9 +737,9 @@ public class TestReplicationManager {
     int currentReplicateCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
 
-    replicationManager.processContainersNow();
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
-    Thread.sleep(100L);
+    eventQueue.processAll(1000);
     // At this stage, due to the mocked calls to validteContainerPlacement
     // the mis-replicated racks will not have improved, so expect to see 
nothing
     // scheduled.
@@ -760,9 +759,9 @@ public class TestReplicationManager {
     currentReplicateCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
 
-    replicationManager.processContainersNow();
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
-    Thread.sleep(100L);
+    eventQueue.processAll(1000);
     // At this stage, due to the mocked calls to validteContainerPlacement
     // the mis-replicated racks will not have improved, so expect to see 
nothing
     // scheduled.
@@ -806,9 +805,9 @@ public class TestReplicationManager {
     int currentDeleteCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
 
-    replicationManager.processContainersNow();
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
-    Thread.sleep(100L);
+    eventQueue.processAll(1000);
     // The unhealthy replica should be removed, but not the other replica
     // as each time we test with 3 replicas, Mockitor ensures it returns
     // mis-replicated
@@ -850,9 +849,9 @@ public class TestReplicationManager {
     final int currentDeleteCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
 
-    replicationManager.processContainersNow();
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
-    Thread.sleep(100L);
+    eventQueue.processAll(1000);
     Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
   }
@@ -890,9 +889,9 @@ public class TestReplicationManager {
     final int currentDeleteCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
 
-    replicationManager.processContainersNow();
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
-    Thread.sleep(100L);
+    eventQueue.processAll(1000);
     Assert.assertEquals(currentDeleteCommandCount + 2, datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
   }
@@ -1057,9 +1056,9 @@ public class TestReplicationManager {
     final int currentDeleteCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
 
-    replicationManager.processContainersNow();
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
-    Thread.sleep(100L);
+    eventQueue.processAll(1000);
     Assert.assertEquals(currentDeleteCommandCount + 2, datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
     // Get the DECOM and Maint replica and ensure none of them are scheduled
@@ -1401,9 +1400,9 @@ public class TestReplicationManager {
     final int currentReplicateCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
 
-    replicationManager.processContainersNow();
+    replicationManager.processAll();
     // Wait for EventQueue to call the event handler
-    Thread.sleep(100L);
+    eventQueue.processAll(1000);
     Assert.assertEquals(currentReplicateCommandCount + delta,
         datanodeCommandHandler.getInvocationCount(
             SCMCommandProto.Type.replicateContainerCommand));
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 108ccf8..26afe7e 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -807,7 +807,7 @@ public class TestStorageContainerManager {
       Thread.sleep(5000);
       // Give ReplicationManager some time to process the containers.
       cluster.getStorageContainerManager()
-          .getReplicationManager().processContainersNow();
+          .getReplicationManager().processAll();
       Thread.sleep(5000);
 
       verify(publisher).fireEvent(eq(SCMEvents.DATANODE_COMMAND), argThat(new
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index 7941c7b..d8e80b4 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -335,9 +335,7 @@ public class TestBlockDeletion {
     });
 
     cluster.shutdownHddsDatanode(0);
-    scm.getReplicationManager().processContainersNow();
-    // Wait for container state change to DELETING
-    Thread.sleep(100);
+    scm.getReplicationManager().processAll();
     containerInfos = scm.getContainerManager().getContainers();
     containerInfos.stream().forEach(container ->
         Assert.assertEquals(HddsProtos.LifeCycleState.DELETING,
@@ -346,17 +344,13 @@ public class TestBlockDeletion {
         LogCapturer.captureLogs(ReplicationManager.LOG);
     logCapturer.clearOutput();
 
-    scm.getReplicationManager().processContainersNow();
-    Thread.sleep(100);
-    // Wait for delete replica command resend
+    scm.getReplicationManager().processAll();
     GenericTestUtils.waitFor(() -> logCapturer.getOutput()
         .contains("Resend delete Container"), 500, 5000);
     cluster.restartHddsDatanode(0, true);
     Thread.sleep(100);
 
-    scm.getReplicationManager().processContainersNow();
-    // Wait for container state change to DELETED
-    Thread.sleep(100);
+    scm.getReplicationManager().processAll();
     containerInfos = scm.getContainerManager().getContainers();
     containerInfos.stream().forEach(container -> {
       Assert.assertEquals(HddsProtos.LifeCycleState.DELETED,
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
index 8b55e1b..6243e4b 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
@@ -135,7 +135,7 @@ public class TestSCMContainerPlacementPolicyMetrics {
     } catch (InterruptedException e) {
     }
     cluster.getStorageContainerManager().getReplicationManager()
-        .processContainersNow();
+        .processAll();
     try {
       Thread.sleep(30 * 1000);
     } catch (InterruptedException e) {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to