This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new b447ffc HDDS-5401. Add more metrics to ReplicationManager to help
monitor replication progress. (#2382)
b447ffc is described below
commit b447ffcccce368ccad1e053ce264c3923ae423c4
Author: Gui Hecheng <[email protected]>
AuthorDate: Mon Aug 2 11:38:52 2021 +0800
HDDS-5401. Add more metrics to ReplicationManager to help monitor
replication progress. (#2382)
---
.../hdds/scm/container/ReplicationManager.java | 117 +++++-----
.../replication/ReplicationManagerMetrics.java | 161 ++++++++++++++
.../hdds/scm/container/TestReplicationManager.java | 245 ++++++++++++++++++---
3 files changed, 438 insertions(+), 85 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
index 4ab69a0..69f0c1d 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
@@ -30,7 +30,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -41,6 +40,7 @@ import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.HddsConfigKeys;
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import
org.apache.hadoop.hdds.scm.container.replication.ReplicationManagerMetrics;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMService;
@@ -66,10 +67,6 @@ import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.metrics2.MetricsCollector;
-import org.apache.hadoop.metrics2.MetricsInfo;
-import org.apache.hadoop.metrics2.MetricsSource;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
@@ -92,7 +89,7 @@ import org.slf4j.LoggerFactory;
* that the containers are properly replicated. Replication Manager deals only
* with Quasi Closed / Closed container.
*/
-public class ReplicationManager implements MetricsSource, SCMService {
+public class ReplicationManager implements SCMService {
public static final Logger LOG =
LoggerFactory.getLogger(ReplicationManager.class);
@@ -231,6 +228,11 @@ public class ReplicationManager implements MetricsSource,
SCMService {
private final Clock clock;
/**
+ * Replication progress related metrics.
+ */
+ private ReplicationManagerMetrics metrics;
+
+ /**
* Constructs ReplicationManager instance with the given configuration.
*
* @param conf OzoneConfiguration
@@ -265,6 +267,7 @@ public class ReplicationManager implements MetricsSource,
SCMService {
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
TimeUnit.MILLISECONDS);
+ this.metrics = null;
// register ReplicationManager to SCMServiceManager.
serviceManager.register(this);
@@ -279,10 +282,7 @@ public class ReplicationManager implements MetricsSource,
SCMService {
@Override
public synchronized void start() {
if (!isRunning()) {
- DefaultMetricsSystem.instance().register(METRICS_SOURCE_NAME,
- "SCM Replication manager (closed container replication) related "
- + "metrics",
- this);
+ metrics = ReplicationManagerMetrics.create(this);
LOG.info("Starting Replication Monitor Thread.");
running = true;
replicationMonitor = new Thread(this::run);
@@ -321,7 +321,7 @@ public class ReplicationManager implements MetricsSource,
SCMService {
inflightMove.clear();
inflightMoveFuture.clear();
running = false;
- DefaultMetricsSystem.instance().unregisterSource(METRICS_SOURCE_NAME);
+ metrics.unRegister();
notifyAll();
} else {
LOG.info("Replication Monitor Thread is not running.");
@@ -420,12 +420,20 @@ public class ReplicationManager implements MetricsSource,
SCMService {
*/
updateInflightAction(container, inflightReplication,
action -> replicas.stream()
- .anyMatch(r ->
r.getDatanodeDetails().equals(action.datanode)));
+ .anyMatch(r -> r.getDatanodeDetails().equals(action.datanode)),
+ ()-> metrics.incrNumReplicationCmdsTimeout(),
+ () -> {
+ metrics.incrNumReplicationCmdsCompleted();
+ metrics.incrNumReplicationBytesCompleted(
+ container.getUsedBytes());
+ });
updateInflightAction(container, inflightDeletion,
action -> replicas.stream()
.noneMatch(r ->
- r.getDatanodeDetails().equals(action.datanode)));
+ r.getDatanodeDetails().equals(action.datanode)),
+ () -> metrics.incrNumDeletionCmdsTimeout(),
+ () -> metrics.incrNumDeletionCmdsCompleted());
/*
* If container is under deleting and all it's replicas are deleted,
@@ -507,10 +515,14 @@ public class ReplicationManager implements MetricsSource,
SCMService {
* @param container Container to update
* @param inflightActions inflightReplication (or) inflightDeletion
* @param filter filter to check if the operation is completed
+ * @param timeoutCounter update timeout metrics
+ * @param completedCounter update completed metrics
*/
private void updateInflightAction(final ContainerInfo container,
final Map<ContainerID, List<InflightAction>> inflightActions,
- final Predicate<InflightAction> filter) {
+ final Predicate<InflightAction> filter,
+ final Runnable timeoutCounter,
+ final Runnable completedCounter) {
final ContainerID id = container.containerID();
final long deadline = clock.millis() - rmConf.getEventTimeout();
if (inflightActions.containsKey(id)) {
@@ -528,6 +540,13 @@ public class ReplicationManager implements MetricsSource,
SCMService {
NodeOperationalState.IN_SERVICE;
if (isCompleted || isUnhealthy || isTimeout || isNotInService) {
iter.remove();
+
+ if (isTimeout) {
+ timeoutCounter.run();
+ } else if (isCompleted) {
+ completedCounter.run();
+ }
+
updateMoveIfNeeded(isUnhealthy, isCompleted, isTimeout,
isNotInService, container, a.datanode, inflightActions);
}
@@ -1439,6 +1458,9 @@ public class ReplicationManager implements MetricsSource,
SCMService {
inflightReplication.computeIfAbsent(id, k -> new ArrayList<>());
sendAndTrackDatanodeCommand(datanode, replicateCommand,
action -> inflightReplication.get(id).add(action));
+
+ metrics.incrNumReplicationCmdsSent();
+ metrics.incrNumReplicationBytesTotal(container.getUsedBytes());
}
/**
@@ -1462,6 +1484,8 @@ public class ReplicationManager implements MetricsSource,
SCMService {
inflightDeletion.computeIfAbsent(id, k -> new ArrayList<>());
sendAndTrackDatanodeCommand(datanode, deleteCommand,
action -> inflightDeletion.get(id).add(action));
+
+ metrics.incrNumDeletionCmdsSent();
}
/**
@@ -1547,22 +1571,10 @@ public class ReplicationManager implements
MetricsSource, SCMService {
.allMatch(r -> ReplicationManager.compareState(state, r.getState()));
}
- @Override
- public void getMetrics(MetricsCollector collector, boolean all) {
- collector.addRecord(ReplicationManager.class.getSimpleName())
- .addGauge(ReplicationManagerMetrics.INFLIGHT_REPLICATION,
- inflightReplication.size())
- .addGauge(ReplicationManagerMetrics.INFLIGHT_DELETION,
- inflightDeletion.size())
- .addGauge(ReplicationManagerMetrics.INFLIGHT_MOVE,
- inflightMove.size())
- .endRecord();
- }
-
/**
* Wrapper class to hold the InflightAction with its start time.
*/
- private static final class InflightAction {
+ static final class InflightAction {
private final DatanodeDetails datanode;
private final long time;
@@ -1572,6 +1584,11 @@ public class ReplicationManager implements
MetricsSource, SCMService {
this.datanode = datanode;
this.time = time;
}
+
+ @VisibleForTesting
+ public DatanodeDetails getDatanode() {
+ return datanode;
+ }
}
/**
@@ -1645,35 +1662,6 @@ public class ReplicationManager implements
MetricsSource, SCMService {
}
}
- /**
- * Metric name definitions for Replication manager.
- */
- public enum ReplicationManagerMetrics implements MetricsInfo {
-
- INFLIGHT_REPLICATION("Tracked inflight container replication requests."),
- INFLIGHT_DELETION("Tracked inflight container deletion requests."),
- INFLIGHT_MOVE("Tracked inflight container move requests.");
-
- private final String desc;
-
- ReplicationManagerMetrics(String desc) {
- this.desc = desc;
- }
-
- @Override
- public String description() {
- return desc;
- }
-
- @Override
- public String toString() {
- return new StringJoiner(", ", this.getClass().getSimpleName() + "{", "}")
- .add("name=" + name())
- .add("description=" + desc)
- .toString();
- }
- }
-
@Override
public void notifyStatusChanged() {
serviceLock.lock();
@@ -1711,4 +1699,21 @@ public class ReplicationManager implements
MetricsSource, SCMService {
public String getServiceName() {
return ReplicationManager.class.getSimpleName();
}
+
+ public ReplicationManagerMetrics getMetrics() {
+ return this.metrics;
+ }
+
+ public Map<ContainerID, List<InflightAction>> getInflightReplication() {
+ return inflightReplication;
+ }
+
+ public Map<ContainerID, List<InflightAction>> getInflightDeletion() {
+ return inflightDeletion;
+ }
+
+ public Map<ContainerID,
+ Pair<DatanodeDetails, DatanodeDetails>> getInflightMove() {
+ return inflightMove;
+ }
}
\ No newline at end of file
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java
new file mode 100644
index 0000000..69b1462
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+
+/**
+ * Class contains metrics related to ReplicationManager.
+ */
+@Metrics(about = "Replication Manager Metrics", context = OzoneConsts.OZONE)
+public final class ReplicationManagerMetrics {
+
+ public static final String METRICS_SOURCE_NAME =
+ ReplicationManagerMetrics.class.getSimpleName();
+
+ @Metric("Tracked inflight container replication requests.")
+ private MutableGaugeLong inflightReplication;
+
+ @Metric("Tracked inflight container deletion requests.")
+ private MutableGaugeLong inflightDeletion;
+
+ @Metric("Tracked inflight container move requests.")
+ private MutableGaugeLong inflightMove;
+
+ @Metric("Number of replication commands sent.")
+ private MutableCounterLong numReplicationCmdsSent;
+
+ @Metric("Number of replication commands completed.")
+ private MutableCounterLong numReplicationCmdsCompleted;
+
+ @Metric("Number of replication commands timeout.")
+ private MutableCounterLong numReplicationCmdsTimeout;
+
+ @Metric("Number of deletion commands sent.")
+ private MutableCounterLong numDeletionCmdsSent;
+
+ @Metric("Number of deletion commands completed.")
+ private MutableCounterLong numDeletionCmdsCompleted;
+
+ @Metric("Number of deletion commands timeout.")
+ private MutableCounterLong numDeletionCmdsTimeout;
+
+ @Metric("Number of replication bytes total.")
+ private MutableCounterLong numReplicationBytesTotal;
+
+ @Metric("Number of replication bytes completed.")
+ private MutableCounterLong numReplicationBytesCompleted;
+
+ private ReplicationManager replicationManager;
+
+ public ReplicationManagerMetrics(ReplicationManager manager) {
+ this.replicationManager = manager;
+ }
+
+ public static ReplicationManagerMetrics create(ReplicationManager manager) {
+ return DefaultMetricsSystem.instance().register(METRICS_SOURCE_NAME,
+ "SCM Replication manager (closed container replication) related "
+ + "metrics",
+ new ReplicationManagerMetrics(manager));
+ }
+
+ public void unRegister() {
+ DefaultMetricsSystem.instance().unregisterSource(METRICS_SOURCE_NAME);
+ }
+
+ public void incrNumReplicationCmdsSent() {
+ this.numReplicationCmdsSent.incr();
+ }
+
+ public void incrNumReplicationCmdsCompleted() {
+ this.numReplicationCmdsCompleted.incr();
+ }
+
+ public void incrNumReplicationCmdsTimeout() {
+ this.numReplicationCmdsTimeout.incr();
+ }
+
+ public void incrNumDeletionCmdsSent() {
+ this.numDeletionCmdsSent.incr();
+ }
+
+ public void incrNumDeletionCmdsCompleted() {
+ this.numDeletionCmdsCompleted.incr();
+ }
+
+ public void incrNumDeletionCmdsTimeout() {
+ this.numDeletionCmdsTimeout.incr();
+ }
+
+ public void incrNumReplicationBytesTotal(long bytes) {
+ this.numReplicationBytesTotal.incr(bytes);
+ }
+
+ public void incrNumReplicationBytesCompleted(long bytes) {
+ this.numReplicationBytesCompleted.incr(bytes);
+ }
+
+ public long getInflightReplication() {
+ return replicationManager.getInflightReplication().size();
+ }
+
+ public long getInflightDeletion() {
+ return replicationManager.getInflightDeletion().size();
+ }
+
+ public long getInflightMove() {
+ return replicationManager.getInflightMove().size();
+ }
+
+ public long getNumReplicationCmdsSent() {
+ return this.numReplicationCmdsSent.value();
+ }
+
+ public long getNumReplicationCmdsCompleted() {
+ return this.numReplicationCmdsCompleted.value();
+ }
+
+ public long getNumReplicationCmdsTimeout() {
+ return this.numReplicationCmdsTimeout.value();
+ }
+
+ public long getNumDeletionCmdsSent() {
+ return this.numDeletionCmdsSent.value();
+ }
+
+ public long getNumDeletionCmdsCompleted() {
+ return this.numDeletionCmdsCompleted.value();
+ }
+
+ public long getNumDeletionCmdsTimeout() {
+ return this.numDeletionCmdsTimeout.value();
+ }
+
+ public long getNumReplicationBytesTotal() {
+ return this.numReplicationBytesTotal.value();
+ }
+
+ public long getNumReplicationBytesCompleted() {
+ return this.numReplicationBytesCompleted.value();
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index 1304258..d7fcde7 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -215,7 +215,6 @@ public class TestReplicationManager {
final ContainerInfo container = getContainer(LifeCycleState.OPEN);
containerStateManager.loadContainer(container);
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
@@ -250,7 +249,6 @@ public class TestReplicationManager {
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
Assert.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
@@ -261,7 +259,6 @@ public class TestReplicationManager {
}
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
Assert.assertEquals(currentCloseCommandCount + 6, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
@@ -296,7 +293,6 @@ public class TestReplicationManager {
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
// Two of the replicas are in OPEN state
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
Assert.assertEquals(currentCloseCommandCount + 2, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
@@ -334,7 +330,6 @@ public class TestReplicationManager {
// All the QUASI_CLOSED replicas have same originNodeId, so the
// container will not be closed. ReplicationManager should take no action.
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
}
@@ -353,6 +348,7 @@ public class TestReplicationManager {
throws SCMException, ContainerNotFoundException, InterruptedException,
ContainerReplicaNotFoundException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
+ container.setUsedBytes(100);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
@@ -375,7 +371,6 @@ public class TestReplicationManager {
// All the QUASI_CLOSED replicas have same originNodeId, so the
// container will not be closed. ReplicationManager should take no action.
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
@@ -386,26 +381,59 @@ public class TestReplicationManager {
containerStateManager.updateContainerReplica(id, unhealthyReplica);
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
Assert.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand,
replicaOne.getDatanodeDetails()));
+ Assert.assertEquals(currentDeleteCommandCount + 1,
+ replicationManager.getMetrics().getNumDeletionCmdsSent());
// Now we will delete the unhealthy replica from in-memory.
containerStateManager.removeContainerReplica(id, replicaOne);
+ final long currentBytesToReplicate = replicationManager.getMetrics()
+ .getNumReplicationBytesTotal();
+
// The container is under replicated as unhealthy replica is removed
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
// We should get replicate command
Assert.assertEquals(currentReplicateCommandCount + 1,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
+ Assert.assertEquals(currentReplicateCommandCount + 1,
+ replicationManager.getMetrics().getNumReplicationCmdsSent());
+ Assert.assertEquals(currentBytesToReplicate + 100L,
+ replicationManager.getMetrics().getNumReplicationBytesTotal());
+ Assert.assertEquals(1, replicationManager.getInflightReplication().size());
+ Assert.assertEquals(1, replicationManager.getMetrics()
+ .getInflightReplication());
+
+ // Now we add the missing replica back
+ DatanodeDetails targetDn = replicationManager.getInflightReplication()
+ .get(id).get(0).getDatanode();
+ final ContainerReplica replicatedReplicaOne = getReplicas(
+ id, State.CLOSED, 1000L, originNodeId, targetDn);
+ containerStateManager.updateContainerReplica(id, replicatedReplicaOne);
+
+ final long currentReplicationCommandCompleted = replicationManager
+ .getMetrics().getNumReplicationCmdsCompleted();
+ final long currentBytesCompleted = replicationManager.getMetrics()
+ .getNumReplicationBytesCompleted();
+
+ replicationManager.processAll();
+ eventQueue.processAll(1000);
+
+ Assert.assertEquals(0, replicationManager.getInflightReplication().size());
+ Assert.assertEquals(0, replicationManager.getMetrics()
+ .getInflightReplication());
+ Assert.assertEquals(currentReplicationCommandCompleted + 1,
+ replicationManager.getMetrics().getNumReplicationCmdsCompleted());
+ Assert.assertEquals(currentBytesCompleted + 100L,
+ replicationManager.getMetrics().getNumReplicationBytesCompleted());
}
/**
@@ -437,10 +465,38 @@ public class TestReplicationManager {
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
+ Assert.assertEquals(currentDeleteCommandCount + 1,
+ replicationManager.getMetrics().getNumDeletionCmdsSent());
+ Assert.assertEquals(1, replicationManager.getInflightDeletion().size());
+ Assert.assertEquals(1, replicationManager.getMetrics()
+ .getInflightDeletion());
+
+ // Now we remove the replica according to inflight
+ DatanodeDetails targetDn = replicationManager.getInflightDeletion()
+ .get(id).get(0).getDatanode();
+ if (targetDn.equals(replicaOne.getDatanodeDetails())) {
+ containerStateManager.removeContainerReplica(id, replicaOne);
+ } else if (targetDn.equals(replicaTwo.getDatanodeDetails())) {
+ containerStateManager.removeContainerReplica(id, replicaTwo);
+ } else if (targetDn.equals(replicaThree.getDatanodeDetails())) {
+ containerStateManager.removeContainerReplica(id, replicaThree);
+ } else if (targetDn.equals(replicaFour.getDatanodeDetails())) {
+ containerStateManager.removeContainerReplica(id, replicaFour);
+ }
+
+ final long currentDeleteCommandCompleted = replicationManager.getMetrics()
+ .getNumDeletionCmdsCompleted();
+
+ replicationManager.processAll();
+ eventQueue.processAll(1000);
+ Assert.assertEquals(0, replicationManager.getInflightDeletion().size());
+ Assert.assertEquals(0, replicationManager.getMetrics()
+ .getInflightDeletion());
+ Assert.assertEquals(currentDeleteCommandCompleted + 1,
+ replicationManager.getMetrics().getNumDeletionCmdsCompleted());
}
/**
@@ -474,13 +530,31 @@ public class TestReplicationManager {
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
Assert.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand,
replicaOne.getDatanodeDetails()));
+ Assert.assertEquals(currentDeleteCommandCount + 1,
+ replicationManager.getMetrics().getNumDeletionCmdsSent());
+ Assert.assertEquals(1, replicationManager.getInflightDeletion().size());
+ Assert.assertEquals(1, replicationManager.getMetrics()
+ .getInflightDeletion());
+
+ final long currentDeleteCommandCompleted = replicationManager.getMetrics()
+ .getNumDeletionCmdsCompleted();
+ // Now we remove the replica to simulate deletion complete
+ containerStateManager.removeContainerReplica(id, replicaOne);
+
+ replicationManager.processAll();
+ eventQueue.processAll(1000);
+
+ Assert.assertEquals(currentDeleteCommandCompleted + 1,
+ replicationManager.getMetrics().getNumDeletionCmdsCompleted());
+ Assert.assertEquals(0, replicationManager.getInflightDeletion().size());
+ Assert.assertEquals(0, replicationManager.getMetrics()
+ .getInflightDeletion());
}
/**
@@ -491,6 +565,7 @@ public class TestReplicationManager {
public void testUnderReplicatedQuasiClosedContainer() throws
SCMException, ContainerNotFoundException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
+ container.setUsedBytes(100);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
@@ -504,13 +579,44 @@ public class TestReplicationManager {
final int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
+ final long currentBytesToReplicate = replicationManager.getMetrics()
+ .getNumReplicationBytesTotal();
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
Assert.assertEquals(currentReplicateCommandCount + 1,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
+ Assert.assertEquals(currentReplicateCommandCount + 1,
+ replicationManager.getMetrics().getNumReplicationCmdsSent());
+ Assert.assertEquals(currentBytesToReplicate + 100,
+ replicationManager.getMetrics().getNumReplicationBytesTotal());
+ Assert.assertEquals(1, replicationManager.getInflightReplication().size());
+ Assert.assertEquals(1, replicationManager.getMetrics()
+ .getInflightReplication());
+
+ final long currentReplicateCommandCompleted = replicationManager
+ .getMetrics().getNumReplicationCmdsCompleted();
+ final long currentReplicateBytesCompleted = replicationManager
+ .getMetrics().getNumReplicationBytesCompleted();
+
+ // Now we add the replicated new replica
+ DatanodeDetails targetDn = replicationManager.getInflightReplication()
+ .get(id).get(0).getDatanode();
+ final ContainerReplica replicatedReplicaThree = getReplicas(
+ id, State.CLOSED, 1000L, originNodeId, targetDn);
+ containerStateManager.updateContainerReplica(id, replicatedReplicaThree);
+
+ replicationManager.processAll();
+ eventQueue.processAll(1000);
+
+ Assert.assertEquals(currentReplicateCommandCompleted + 1,
+ replicationManager.getMetrics().getNumReplicationCmdsCompleted());
+ Assert.assertEquals(currentReplicateBytesCompleted + 100,
+ replicationManager.getMetrics().getNumReplicationBytesCompleted());
+ Assert.assertEquals(0, replicationManager.getInflightReplication().size());
+ Assert.assertEquals(0, replicationManager.getMetrics()
+ .getInflightReplication());
}
/**
@@ -554,7 +660,6 @@ public class TestReplicationManager {
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
GenericTestUtils.waitFor(
() -> (currentReplicateCommandCount + 1) == datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand),
@@ -580,7 +685,6 @@ public class TestReplicationManager {
*/
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
@@ -588,9 +692,16 @@ public class TestReplicationManager {
Assert.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand,
replicaTwo.getDatanodeDetails()));
+ Assert.assertEquals(currentDeleteCommandCount + 1,
+ replicationManager.getMetrics().getNumDeletionCmdsSent());
+ Assert.assertEquals(1, replicationManager.getInflightDeletion().size());
+ Assert.assertEquals(1, replicationManager.getMetrics()
+ .getInflightDeletion());
containerStateManager.removeContainerReplica(id, replicaTwo);
+ final long currentDeleteCommandCompleted = replicationManager.getMetrics()
+ .getNumDeletionCmdsCompleted();
/*
* We have now removed unhealthy replica, next iteration of
* ReplicationManager should re-replicate the container as it
@@ -598,11 +709,22 @@ public class TestReplicationManager {
*/
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
+
+ Assert.assertEquals(0, replicationManager.getInflightDeletion().size());
+ Assert.assertEquals(0, replicationManager.getMetrics()
+ .getInflightDeletion());
+ Assert.assertEquals(currentDeleteCommandCompleted + 1,
+ replicationManager.getMetrics().getNumDeletionCmdsCompleted());
+
Assert.assertEquals(currentReplicateCommandCount + 2,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
+ Assert.assertEquals(currentReplicateCommandCount + 2,
+ replicationManager.getMetrics().getNumReplicationCmdsSent());
+ Assert.assertEquals(1, replicationManager.getInflightReplication().size());
+ Assert.assertEquals(1, replicationManager.getMetrics()
+ .getInflightReplication());
}
@@ -630,7 +752,6 @@ public class TestReplicationManager {
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
// All the replicas have same BCSID, so all of them will be closed.
@@ -660,7 +781,6 @@ public class TestReplicationManager {
}
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
}
@@ -688,7 +808,6 @@ public class TestReplicationManager {
eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
Mockito.verify(closeContainerHandler, Mockito.times(1))
.onMessage(id, eventQueue);
@@ -710,6 +829,7 @@ public class TestReplicationManager {
public void additionalReplicaScheduledWhenMisReplicated()
throws SCMException, ContainerNotFoundException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
+ container.setUsedBytes(100);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
@@ -736,15 +856,23 @@ public class TestReplicationManager {
int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
+ final long currentBytesToReplicate = replicationManager.getMetrics()
+ .getNumReplicationBytesTotal();
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
- // At this stage, due to the mocked calls to validteContainerPlacement
- // the mis-replicated racks will not have improved, so expect to see
nothing
- // scheduled.
+ // At this stage, due to the mocked calls to validateContainerPlacement
+ // the policy will not be satisfied, and replication will be triggered.
+
Assert.assertEquals(currentReplicateCommandCount + 1,
datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand));
+ Assert.assertEquals(currentReplicateCommandCount + 1,
+ replicationManager.getMetrics().getNumReplicationCmdsSent());
+ Assert.assertEquals(currentBytesToReplicate + 100,
+ replicationManager.getMetrics().getNumReplicationBytesTotal());
+ Assert.assertEquals(1, replicationManager.getInflightReplication().size());
+ Assert.assertEquals(1, replicationManager.getMetrics()
+ .getInflightReplication());
// Now make it so that all containers seem mis-replicated no matter how
// many replicas. This will test replicas are not scheduled if the new
@@ -760,13 +888,17 @@ public class TestReplicationManager {
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
- // At this stage, due to the mocked calls to validteContainerPlacement
+ // At this stage, due to the mocked calls to validateContainerPlacement
// the mis-replicated racks will not have improved, so expect to see
nothing
// scheduled.
Assert.assertEquals(currentReplicateCommandCount, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand));
+ Assert.assertEquals(currentReplicateCommandCount,
+ replicationManager.getMetrics().getNumReplicationCmdsSent());
+ Assert.assertEquals(1, replicationManager.getInflightReplication().size());
+ Assert.assertEquals(1, replicationManager.getMetrics()
+ .getInflightReplication());
}
@Test
@@ -806,17 +938,21 @@ public class TestReplicationManager {
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
// The unhealthy replica should be removed, but not the other replica
- // as each time we test with 3 replicas, Mockitor ensures it returns
+ // as each time we test with 3 replicas, Mockito ensures it returns
// mis-replicated
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
+ Assert.assertEquals(currentDeleteCommandCount + 1,
+ replicationManager.getMetrics().getNumDeletionCmdsSent());
Assert.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand,
replicaFive.getDatanodeDetails()));
+ Assert.assertEquals(1, replicationManager.getInflightDeletion().size());
+ Assert.assertEquals(1, replicationManager.getMetrics()
+ .getInflightDeletion());
}
@Test
@@ -850,10 +986,14 @@ public class TestReplicationManager {
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
+ Assert.assertEquals(currentDeleteCommandCount + 1,
+ replicationManager.getMetrics().getNumDeletionCmdsSent());
+ Assert.assertEquals(1, replicationManager.getInflightDeletion().size());
+ Assert.assertEquals(1, replicationManager.getMetrics()
+ .getInflightDeletion());
}
@Test
@@ -890,10 +1030,14 @@ public class TestReplicationManager {
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
Assert.assertEquals(currentDeleteCommandCount + 2, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
+ Assert.assertEquals(currentDeleteCommandCount + 2,
+ replicationManager.getMetrics().getNumDeletionCmdsSent());
+ Assert.assertEquals(1, replicationManager.getInflightDeletion().size());
+ Assert.assertEquals(1, replicationManager.getMetrics()
+ .getInflightDeletion());
}
/**
@@ -1057,10 +1201,14 @@ public class TestReplicationManager {
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
Assert.assertEquals(currentDeleteCommandCount + 2, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
+ Assert.assertEquals(currentDeleteCommandCount + 2,
+ replicationManager.getMetrics().getNumDeletionCmdsSent());
+ Assert.assertEquals(1, replicationManager.getInflightDeletion().size());
+ Assert.assertEquals(1, replicationManager.getMetrics()
+ .getInflightDeletion());
// Get the DECOM and Maint replica and ensure none of them are scheduled
// for removal
Set<ContainerReplica> decom =
@@ -1341,6 +1489,31 @@ public class TestReplicationManager {
// scheduled
clock.fastForward(timeout + 1000);
assertReplicaScheduled(1);
+ Assert.assertEquals(1, replicationManager.getMetrics()
+ .getNumReplicationCmdsTimeout());
+ }
+
+ @Test
+ public void testDeleteCommandTimeout() throws
+ SCMException, InterruptedException {
+ long timeout = new ReplicationManagerConfiguration().getEventTimeout();
+
+ final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
+ addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+ addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+ addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+ addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+ assertDeleteScheduled(1);
+
+ // Already a pending replica, so nothing scheduled
+ assertReplicaScheduled(0);
+
+ // Advance the clock past the timeout, and there should be a replica
+ // scheduled
+ clock.fastForward(timeout + 1000);
+ assertDeleteScheduled(1);
+ Assert.assertEquals(1, replicationManager.getMetrics()
+ .getNumDeletionCmdsTimeout());
}
private ContainerInfo createContainer(LifeCycleState containerState)
@@ -1393,11 +1566,25 @@ public class TestReplicationManager {
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
replicationManager.processAll();
- // Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
Assert.assertEquals(currentReplicateCommandCount + delta,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
+ Assert.assertEquals(currentReplicateCommandCount + delta,
+ replicationManager.getMetrics().getNumReplicationCmdsSent());
+ }
+
+ private void assertDeleteScheduled(int delta) throws InterruptedException {
+ final int currentDeleteCommandCount = datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
+
+ replicationManager.processAll();
+ eventQueue.processAll(1000);
+ Assert.assertEquals(currentDeleteCommandCount + delta,
+ datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.deleteContainerCommand));
+ Assert.assertEquals(currentDeleteCommandCount + delta,
+ replicationManager.getMetrics().getNumDeletionCmdsSent());
}
@After
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]