This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new b588694 HDDS-5459. Replication Manager should process containers
synchronously for tests (#2429)
b588694 is described below
commit b588694da0a16aa2a7c1b0998e235edeeada1eae
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Wed Jul 21 03:57:19 2021 +0100
HDDS-5459. Replication Manager should process containers synchronously for
tests (#2429)
---
.../hdds/scm/container/ReplicationManager.java | 37 ++++-----
.../hdds/scm/container/TestReplicationManager.java | 95 +++++++++++-----------
.../hadoop/ozone/TestStorageContainerManager.java | 2 +-
.../commandhandler/TestBlockDeletion.java | 12 +--
.../TestSCMContainerPlacementPolicyMetrics.java | 2 +-
5 files changed, 68 insertions(+), 80 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
index 104ef28..4ab69a0 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
@@ -78,9 +78,7 @@ import
org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.ExitUtil;
-import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.GeneratedMessage;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
@@ -312,16 +310,6 @@ public class ReplicationManager implements MetricsSource,
SCMService {
}
/**
- * Process all the containers immediately.
- */
- @VisibleForTesting
- @SuppressFBWarnings(value="NN_NAKED_NOTIFY",
- justification="Used only for testing")
- public synchronized void processContainersNow() {
- notifyAll();
- }
-
- /**
* Stops Replication Monitor thread.
*/
public synchronized void stop() {
@@ -341,21 +329,28 @@ public class ReplicationManager implements MetricsSource,
SCMService {
}
/**
+ * Process all the containers now, and wait for the processing to complete.
+ * This in intended to be used in tests.
+ */
+ public synchronized void processAll() {
+ final long start = clock.millis();
+ final List<ContainerInfo> containers =
+ containerManager.getContainers();
+ containers.forEach(this::processContainer);
+
+ LOG.info("Replication Monitor Thread took {} milliseconds for" +
+ " processing {} containers.", clock.millis() - start,
+ containers.size());
+ }
+
+ /**
* ReplicationMonitor thread runnable. This wakes up at configured
* interval and processes all the containers in the system.
*/
private synchronized void run() {
try {
while (running) {
- final long start = clock.millis();
- final List<ContainerInfo> containers =
- containerManager.getContainers();
- containers.forEach(this::processContainer);
-
- LOG.info("Replication Monitor Thread took {} milliseconds for" +
- " processing {} containers.", clock.millis() - start,
- containers.size());
-
+ processAll();
wait(rmConf.getInterval());
}
} catch (Throwable t) {
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index d01aafc..d92e769 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -214,9 +214,9 @@ public class TestReplicationManager {
public void testOpenContainer() throws SCMException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.OPEN);
containerStateManager.loadContainer(container);
- replicationManager.processContainersNow();
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
- Thread.sleep(100L);
+ eventQueue.processAll(1000);
Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
}
@@ -249,9 +249,9 @@ public class TestReplicationManager {
final int currentCloseCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
- replicationManager.processContainersNow();
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
- Thread.sleep(100L);
+ eventQueue.processAll(1000);
Assert.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
@@ -260,9 +260,9 @@ public class TestReplicationManager {
containerStateManager.updateContainerReplica(id, replica);
}
- replicationManager.processContainersNow();
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
- Thread.sleep(100L);
+ eventQueue.processAll(1000);
Assert.assertEquals(currentCloseCommandCount + 6, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
}
@@ -295,9 +295,9 @@ public class TestReplicationManager {
final int currentCloseCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
// Two of the replicas are in OPEN state
- replicationManager.processContainersNow();
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
- Thread.sleep(100L);
+ eventQueue.processAll(1000);
Assert.assertEquals(currentCloseCommandCount + 2, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
Assert.assertTrue(datanodeCommandHandler.received(
@@ -333,9 +333,9 @@ public class TestReplicationManager {
// All the QUASI_CLOSED replicas have same originNodeId, so the
// container will not be closed. ReplicationManager should take no action.
- replicationManager.processContainersNow();
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
- Thread.sleep(100L);
+ eventQueue.processAll(1000);
Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
}
@@ -374,9 +374,9 @@ public class TestReplicationManager {
// All the QUASI_CLOSED replicas have same originNodeId, so the
// container will not be closed. ReplicationManager should take no action.
- replicationManager.processContainersNow();
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
- Thread.sleep(100L);
+ eventQueue.processAll(1000);
Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
// Make the first replica unhealthy
@@ -385,9 +385,9 @@ public class TestReplicationManager {
replicaOne.getDatanodeDetails());
containerStateManager.updateContainerReplica(id, unhealthyReplica);
- replicationManager.processContainersNow();
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
- Thread.sleep(100L);
+ eventQueue.processAll(1000);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
Assert.assertTrue(datanodeCommandHandler.received(
@@ -398,9 +398,9 @@ public class TestReplicationManager {
containerStateManager.removeContainerReplica(id, replicaOne);
// The container is under replicated as unhealthy replica is removed
- replicationManager.processContainersNow();
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
- Thread.sleep(100L);
+ eventQueue.processAll(1000);
// We should get replicate command
Assert.assertEquals(currentReplicateCommandCount + 1,
@@ -436,9 +436,9 @@ public class TestReplicationManager {
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
- replicationManager.processContainersNow();
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
- Thread.sleep(100L);
+ eventQueue.processAll(1000);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
}
@@ -473,9 +473,9 @@ public class TestReplicationManager {
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
- replicationManager.processContainersNow();
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
- Thread.sleep(100L);
+ eventQueue.processAll(1000);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
Assert.assertTrue(datanodeCommandHandler.received(
@@ -505,9 +505,9 @@ public class TestReplicationManager {
final int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
- replicationManager.processContainersNow();
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
- Thread.sleep(100L);
+ eventQueue.processAll(1000);
Assert.assertEquals(currentReplicateCommandCount + 1,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
@@ -553,7 +553,7 @@ public class TestReplicationManager {
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
- replicationManager.processContainersNow();
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
GenericTestUtils.waitFor(
() -> (currentReplicateCommandCount + 1) == datanodeCommandHandler
@@ -579,9 +579,9 @@ public class TestReplicationManager {
* iteration it should delete the unhealthy replica.
*/
- replicationManager.processContainersNow();
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
- Thread.sleep(100L);
+ eventQueue.processAll(1000);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
// ReplicaTwo should be deleted, that is the unhealthy one
@@ -597,9 +597,9 @@ public class TestReplicationManager {
* is under replicated now
*/
- replicationManager.processContainersNow();
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
- Thread.sleep(100L);
+ eventQueue.processAll(1000);
Assert.assertEquals(currentReplicateCommandCount + 2,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
@@ -629,9 +629,9 @@ public class TestReplicationManager {
final int currentCloseCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
- replicationManager.processContainersNow();
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
- Thread.sleep(100L);
+ eventQueue.processAll(1000);
// All the replicas have same BCSID, so all of them will be closed.
Assert.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler
@@ -659,9 +659,9 @@ public class TestReplicationManager {
containerStateManager.updateContainerReplica(id, replica);
}
- replicationManager.processContainersNow();
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
- Thread.sleep(100L);
+ eventQueue.processAll(1000);
Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
}
@@ -687,10 +687,9 @@ public class TestReplicationManager {
Mockito.mock(CloseContainerEventHandler.class);
eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
- replicationManager.processContainersNow();
-
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
- Thread.sleep(100L);
+ eventQueue.processAll(1000);
Mockito.verify(closeContainerHandler, Mockito.times(1))
.onMessage(id, eventQueue);
}
@@ -738,9 +737,9 @@ public class TestReplicationManager {
int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
- replicationManager.processContainersNow();
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
- Thread.sleep(100L);
+ eventQueue.processAll(1000);
// At this stage, due to the mocked calls to validteContainerPlacement
// the mis-replicated racks will not have improved, so expect to see
nothing
// scheduled.
@@ -760,9 +759,9 @@ public class TestReplicationManager {
currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
- replicationManager.processContainersNow();
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
- Thread.sleep(100L);
+ eventQueue.processAll(1000);
// At this stage, due to the mocked calls to validteContainerPlacement
// the mis-replicated racks will not have improved, so expect to see
nothing
// scheduled.
@@ -806,9 +805,9 @@ public class TestReplicationManager {
int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
- replicationManager.processContainersNow();
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
- Thread.sleep(100L);
+ eventQueue.processAll(1000);
// The unhealthy replica should be removed, but not the other replica
// as each time we test with 3 replicas, Mockitor ensures it returns
// mis-replicated
@@ -850,9 +849,9 @@ public class TestReplicationManager {
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
- replicationManager.processContainersNow();
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
- Thread.sleep(100L);
+ eventQueue.processAll(1000);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
}
@@ -890,9 +889,9 @@ public class TestReplicationManager {
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
- replicationManager.processContainersNow();
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
- Thread.sleep(100L);
+ eventQueue.processAll(1000);
Assert.assertEquals(currentDeleteCommandCount + 2, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
}
@@ -1057,9 +1056,9 @@ public class TestReplicationManager {
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
- replicationManager.processContainersNow();
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
- Thread.sleep(100L);
+ eventQueue.processAll(1000);
Assert.assertEquals(currentDeleteCommandCount + 2, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
// Get the DECOM and Maint replica and ensure none of them are scheduled
@@ -1401,9 +1400,9 @@ public class TestReplicationManager {
final int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
- replicationManager.processContainersNow();
+ replicationManager.processAll();
// Wait for EventQueue to call the event handler
- Thread.sleep(100L);
+ eventQueue.processAll(1000);
Assert.assertEquals(currentReplicateCommandCount + delta,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 108ccf8..26afe7e 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -807,7 +807,7 @@ public class TestStorageContainerManager {
Thread.sleep(5000);
// Give ReplicationManager some time to process the containers.
cluster.getStorageContainerManager()
- .getReplicationManager().processContainersNow();
+ .getReplicationManager().processAll();
Thread.sleep(5000);
verify(publisher).fireEvent(eq(SCMEvents.DATANODE_COMMAND), argThat(new
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index 7941c7b..d8e80b4 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -335,9 +335,7 @@ public class TestBlockDeletion {
});
cluster.shutdownHddsDatanode(0);
- scm.getReplicationManager().processContainersNow();
- // Wait for container state change to DELETING
- Thread.sleep(100);
+ scm.getReplicationManager().processAll();
containerInfos = scm.getContainerManager().getContainers();
containerInfos.stream().forEach(container ->
Assert.assertEquals(HddsProtos.LifeCycleState.DELETING,
@@ -346,17 +344,13 @@ public class TestBlockDeletion {
LogCapturer.captureLogs(ReplicationManager.LOG);
logCapturer.clearOutput();
- scm.getReplicationManager().processContainersNow();
- Thread.sleep(100);
- // Wait for delete replica command resend
+ scm.getReplicationManager().processAll();
GenericTestUtils.waitFor(() -> logCapturer.getOutput()
.contains("Resend delete Container"), 500, 5000);
cluster.restartHddsDatanode(0, true);
Thread.sleep(100);
- scm.getReplicationManager().processContainersNow();
- // Wait for container state change to DELETED
- Thread.sleep(100);
+ scm.getReplicationManager().processAll();
containerInfos = scm.getContainerManager().getContainers();
containerInfos.stream().forEach(container -> {
Assert.assertEquals(HddsProtos.LifeCycleState.DELETED,
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
index 8b55e1b..6243e4b 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
@@ -135,7 +135,7 @@ public class TestSCMContainerPlacementPolicyMetrics {
} catch (InterruptedException e) {
}
cluster.getStorageContainerManager().getReplicationManager()
- .processContainersNow();
+ .processAll();
try {
Thread.sleep(30 * 1000);
} catch (InterruptedException e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]