This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new c893b933ef HDDS-9774. Store start time of datanode decommissioning
(#5798)
c893b933ef is described below
commit c893b933ef4ff8739c50081680700e016f694bc8
Author: Tejaskriya <[email protected]>
AuthorDate: Wed Jan 3 20:46:53 2024 +0530
HDDS-9774. Store start time of datanode decommissioning (#5798)
---
.../hadoop/hdds/scm/node/DatanodeAdminMonitor.java | 2 +-
.../hdds/scm/node/DatanodeAdminMonitorImpl.java | 122 +++++++++++++++------
.../hdds/scm/node/NodeDecommissionMetrics.java | 17 ++-
.../hdds/scm/node/TestDatanodeAdminMonitor.java | 46 ++++++--
.../hdds/scm/node/TestNodeDecommissionMetrics.java | 22 ++++
5 files changed, 164 insertions(+), 45 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitor.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitor.java
index bd224e45a7..e0b4c3ce54 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitor.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitor.java
@@ -29,6 +29,6 @@ public interface DatanodeAdminMonitor extends Runnable {
void startMonitoring(DatanodeDetails dn);
void stopMonitoring(DatanodeDetails dn);
- Set<DatanodeDetails> getTrackedNodes();
+ Set<DatanodeAdminMonitorImpl.TrackedNode> getTrackedNodes();
void setMetrics(NodeDecommissionMetrics metrics);
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
index a7423a79dc..51c6d12dea 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.scm.node;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -41,13 +42,13 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
@@ -77,9 +78,9 @@ public class DatanodeAdminMonitorImpl implements
DatanodeAdminMonitor {
private EventPublisher eventQueue;
private NodeManager nodeManager;
private ReplicationManager replicationManager;
- private Queue<DatanodeDetails> pendingNodes = new ArrayDeque();
- private Queue<DatanodeDetails> cancelledNodes = new ArrayDeque();
- private Set<DatanodeDetails> trackedNodes = new HashSet<>();
+ private Queue<TrackedNode> pendingNodes = new ArrayDeque();
+ private Queue<TrackedNode> cancelledNodes = new ArrayDeque();
+ private Set<TrackedNode> trackedNodes = ConcurrentHashMap.newKeySet();
private NodeDecommissionMetrics metrics;
private long pipelinesWaitingToClose = 0;
private long sufficientlyReplicatedContainers = 0;
@@ -88,6 +89,41 @@ public class DatanodeAdminMonitorImpl implements
DatanodeAdminMonitor {
private long unClosedContainers = 0;
private long underReplicatedContainers = 0;
+ /**
+ * Inner class for snapshot of Datanode ContainerState in
+ * Decommissioning and Maintenance mode workflow.
+ */
+ public static final class TrackedNode {
+
+ private DatanodeDetails datanodeDetails;
+
+ private long startTime = 0L;
+
+ public TrackedNode(DatanodeDetails datanodeDetails, long startTime) {
+ this.datanodeDetails = datanodeDetails;
+ this.startTime = startTime;
+ }
+
+ @Override
+ public int hashCode() {
+ return datanodeDetails.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof TrackedNode &&
+ datanodeDetails.equals(((TrackedNode) obj).datanodeDetails);
+ }
+
+ public DatanodeDetails getDatanodeDetails() {
+ return datanodeDetails;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+ }
+
private Map<String, ContainerStateInWorkflow> containerStateByHost;
private static final Logger LOG =
@@ -119,8 +155,9 @@ public class DatanodeAdminMonitorImpl implements
DatanodeAdminMonitor {
*/
@Override
public synchronized void startMonitoring(DatanodeDetails dn) {
- cancelledNodes.remove(dn);
- pendingNodes.add(dn);
+ TrackedNode tn = new TrackedNode(dn, System.currentTimeMillis());
+ cancelledNodes.remove(tn);
+ pendingNodes.add(tn);
}
/**
@@ -132,8 +169,9 @@ public class DatanodeAdminMonitorImpl implements
DatanodeAdminMonitor {
*/
@Override
public synchronized void stopMonitoring(DatanodeDetails dn) {
- pendingNodes.remove(dn);
- cancelledNodes.add(dn);
+ TrackedNode tn = new TrackedNode(dn, 0L);
+ pendingNodes.remove(tn);
+ cancelledNodes.add(tn);
}
public synchronized void setMetrics(NodeDecommissionMetrics metrics) {
@@ -143,13 +181,32 @@ public class DatanodeAdminMonitorImpl implements
DatanodeAdminMonitor {
/**
* Get the set of nodes which are currently tracked in the decommissioned
* and maintenance workflow.
+ *
* @return An unmodifiable set of the tracked nodes.
*/
@Override
- public synchronized Set<DatanodeDetails> getTrackedNodes() {
+ public synchronized Set<TrackedNode> getTrackedNodes() {
return Collections.unmodifiableSet(trackedNodes);
}
+ /**
+ * Get a node which is currently tracked in the decommissioned
+ * or maintenance workflow.
+ *
+ * @return tracked node with given IpAddress.
+ */
+ @VisibleForTesting
+ public synchronized TrackedNode getSingleTrackedNode(String ip) {
+ Iterator<TrackedNode> iterator = trackedNodes.iterator();
+ while (iterator.hasNext()) {
+ TrackedNode trackedNode = iterator.next();
+ if (trackedNode.getDatanodeDetails().getIpAddress().equals(ip)) {
+ return trackedNode;
+ }
+ }
+ return null;
+ }
+
/**
* Run an iteration of the monitor. This is the main run loop, and performs
* the following checks:
@@ -219,10 +276,10 @@ public class DatanodeAdminMonitorImpl implements
DatanodeAdminMonitor {
private void processCancelledNodes() {
while (!cancelledNodes.isEmpty()) {
- DatanodeDetails dn = cancelledNodes.poll();
+ TrackedNode dn = cancelledNodes.poll();
try {
- stopTrackingNode(dn);
- putNodeBackInService(dn);
+ stopTrackingNode(dn.getDatanodeDetails());
+ putNodeBackInService(dn.getDatanodeDetails());
LOG.info("Recommissioned node {}", dn);
} catch (NodeNotFoundException e) {
LOG.warn("Failed processing the cancel admin request for {}", dn, e);
@@ -238,10 +295,11 @@ public class DatanodeAdminMonitorImpl implements
DatanodeAdminMonitor {
private void processTransitioningNodes() {
resetContainerMetrics();
- Iterator<DatanodeDetails> iterator = trackedNodes.iterator();
+ Iterator<TrackedNode> iterator = trackedNodes.iterator();
while (iterator.hasNext()) {
- DatanodeDetails dn = iterator.next();
+ TrackedNode trackedNode = iterator.next();
+ DatanodeDetails dn = trackedNode.getDatanodeDetails();
try {
NodeStatus status = getNodeStatus(dn);
@@ -260,12 +318,12 @@ public class DatanodeAdminMonitorImpl implements
DatanodeAdminMonitor {
}
if (status.isDecommissioning() || status.isEnteringMaintenance()) {
- if (checkPipelinesClosedOnNode(dn)
+ if (checkPipelinesClosedOnNode(trackedNode)
// Ensure the DN has received and persisted the current maint
// state.
&& status.getOperationalState()
== dn.getPersistedOpState()
- && checkContainersReplicatedOnNode(dn)) {
+ && checkContainersReplicatedOnNode(trackedNode)) {
// CheckContainersReplicatedOnNode may take a short time to run
// so after it completes, re-get the nodestatus to check the health
// and ensure the state is still good to continue
@@ -319,25 +377,26 @@ public class DatanodeAdminMonitorImpl implements
DatanodeAdminMonitor {
return true;
}
- private boolean checkPipelinesClosedOnNode(DatanodeDetails dn)
+ private boolean checkPipelinesClosedOnNode(TrackedNode dn)
throws NodeNotFoundException {
- Set<PipelineID> pipelines = nodeManager.getPipelines(dn);
- NodeStatus status = nodeManager.getNodeStatus(dn);
+ Set<PipelineID> pipelines = nodeManager.getPipelines(dn
+ .getDatanodeDetails());
+ NodeStatus status = nodeManager.getNodeStatus(dn.getDatanodeDetails());
if (pipelines == null || pipelines.size() == 0
|| status.operationalStateExpired()) {
return true;
} else {
LOG.info("Waiting for pipelines to close for {}. There are {} " +
"pipelines", dn, pipelines.size());
- containerStateByHost.put(dn.getHostName(),
- new ContainerStateInWorkflow(dn.getHostName(), 0L, 0L, 0L,
- pipelines.size()));
+ containerStateByHost.put(dn.getDatanodeDetails().getHostName(),
+ new ContainerStateInWorkflow(dn.getDatanodeDetails().getHostName(),
+ 0L, 0L, 0L, pipelines.size(), dn.getStartTime()));
pipelinesWaitingToClose += pipelines.size();
return false;
}
}
- private boolean checkContainersReplicatedOnNode(DatanodeDetails dn)
+ private boolean checkContainersReplicatedOnNode(TrackedNode dn)
throws NodeNotFoundException {
int sufficientlyReplicated = 0;
int deleting = 0;
@@ -346,7 +405,7 @@ public class DatanodeAdminMonitorImpl implements
DatanodeAdminMonitor {
List<ContainerID> underReplicatedIDs = new ArrayList<>();
List<ContainerID> unClosedIDs = new ArrayList<>();
Set<ContainerID> containers =
- nodeManager.getContainers(dn);
+ nodeManager.getContainers(dn.getDatanodeDetails());
for (ContainerID cid : containers) {
try {
ContainerReplicaCount replicaSet =
@@ -383,7 +442,7 @@ public class DatanodeAdminMonitorImpl implements
DatanodeAdminMonitor {
".legacy", false);
boolean replicatedOK;
if (legacyEnabled) {
- replicatedOK = replicaSet.isSufficientlyReplicatedForOffline(dn,
nodeManager);
+ replicatedOK =
replicaSet.isSufficientlyReplicatedForOffline(dn.getDatanodeDetails(),
nodeManager);
} else {
ReplicationManagerReport report = new ReplicationManagerReport();
replicationManager.checkContainerStatus(replicaSet.getContainer(),
report);
@@ -408,12 +467,12 @@ public class DatanodeAdminMonitorImpl implements
DatanodeAdminMonitor {
LOG.info("{} has {} sufficientlyReplicated, {} deleting, {} " +
"underReplicated and {} unclosed containers",
dn, sufficientlyReplicated, deleting, underReplicated, unclosed);
- containerStateByHost.put(dn.getHostName(),
- new ContainerStateInWorkflow(dn.getHostName(),
+ containerStateByHost.put(dn.getDatanodeDetails().getHostName(),
+ new ContainerStateInWorkflow(dn.getDatanodeDetails().getHostName(),
sufficientlyReplicated,
underReplicated,
unclosed,
- 0L));
+ 0L, dn.getStartTime()));
sufficientlyReplicatedContainers += sufficientlyReplicated;
underReplicatedContainers += underReplicated;
unClosedContainers += unclosed;
@@ -461,13 +520,14 @@ public class DatanodeAdminMonitorImpl implements
DatanodeAdminMonitor {
putNodeBackInService(dn);
}
- private void startTrackingNode(DatanodeDetails dn) {
- eventQueue.fireEvent(SCMEvents.START_ADMIN_ON_NODE, dn);
+ private void startTrackingNode(TrackedNode dn) {
+ eventQueue.fireEvent(SCMEvents.START_ADMIN_ON_NODE,
+ dn.getDatanodeDetails());
trackedNodes.add(dn);
}
private void stopTrackingNode(DatanodeDetails dn) {
- trackedNodes.remove(dn);
+ trackedNodes.remove(new TrackedNode(dn, 0L));
}
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionMetrics.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionMetrics.java
index 8217594ecf..460450c29c 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionMetrics.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionMetrics.java
@@ -71,6 +71,7 @@ public final class NodeDecommissionMetrics implements
MetricsSource {
private long underReplicatedContainers = 0;
private String host = "";
private long pipelinesWaitingToClose = 0;
+ private long startTime = 0;
private static final MetricsInfo HOST_UNDER_REPLICATED = Interns.info(
"UnderReplicatedDN",
@@ -94,16 +95,22 @@ public final class NodeDecommissionMetrics implements
MetricsSource {
private static final MetricsInfo HOST_UNCLOSED_CONTAINERS =
Interns.info("UnclosedContainersDN",
"Number of containers not fully closed for host in decommissioning and
maintenance mode");
+ private static final MetricsInfo HOST_START_TIME =
Interns.info("StartTimeDN",
+ "Time at which decommissioning was started");
+
+
public ContainerStateInWorkflow(String host,
long sufficiently,
long under,
long unclosed,
- long pipelinesToClose) {
+ long pipelinesToClose,
+ long startTime) {
this.host = host;
sufficientlyReplicated = sufficiently;
underReplicatedContainers = under;
unclosedContainers = unclosed;
pipelinesWaitingToClose = pipelinesToClose;
+ this.startTime = startTime;
}
public String getHost() {
@@ -125,6 +132,10 @@ public final class NodeDecommissionMetrics implements
MetricsSource {
public long getUnclosedContainers() {
return unclosedContainers;
}
+
+ public long getStartTime() {
+ return startTime;
+ }
}
private MetricsRegistry registry;
@@ -179,7 +190,9 @@ public final class NodeDecommissionMetrics implements
MetricsSource {
.addGauge(ContainerStateInWorkflow.HOST_SUFFICIENTLY_REPLICATED,
e.getValue().getSufficientlyReplicated())
.addGauge(ContainerStateInWorkflow.HOST_UNCLOSED_CONTAINERS,
- e.getValue().getUnclosedContainers());
+ e.getValue().getUnclosedContainers())
+ .addGauge(ContainerStateInWorkflow.HOST_START_TIME,
+ e.getValue().getStartTime());
}
recordBuilder.endRecord();
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
index 17107cfa95..715041f00a 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
@@ -190,7 +190,6 @@ public class TestDatanodeAdminMonitor {
// REPLICATE_CONTAINERS as there are no pipelines to close.
monitor.startMonitoring(dn1);
monitor.run();
- DatanodeDetails node = getFirstTrackedNode();
assertEquals(1, monitor.getTrackedNodeCount());
assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONING,
nodeManager.getNodeStatus(dn1).getOperationalState());
@@ -493,7 +492,6 @@ public class TestDatanodeAdminMonitor {
// REPLICATE_CONTAINERS as there are no pipelines to close.
monitor.startMonitoring(dn1);
monitor.run();
- DatanodeDetails node = getFirstTrackedNode();
assertEquals(1, monitor.getTrackedNodeCount());
assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONING,
nodeManager.getNodeStatus(dn1).getOperationalState());
@@ -550,7 +548,6 @@ public class TestDatanodeAdminMonitor {
monitor.startMonitoring(dn1);
monitor.run();
assertEquals(1, monitor.getTrackedNodeCount());
- DatanodeDetails node = getFirstTrackedNode();
assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONING,
nodeManager.getNodeStatus(dn1).getOperationalState());
@@ -588,7 +585,6 @@ public class TestDatanodeAdminMonitor {
monitor.startMonitoring(dn1);
monitor.run();
assertEquals(1, monitor.getTrackedNodeCount());
- DatanodeDetails node = getFirstTrackedNode();
assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONING,
nodeManager.getNodeStatus(dn1).getOperationalState());
@@ -603,6 +599,35 @@ public class TestDatanodeAdminMonitor {
nodeManager.getNodeStatus(dn1).getOperationalState());
}
+ @Test
+ public void testStartTimeMetricWhenNodesDecommissioned()
+ throws NodeNotFoundException, ContainerNotFoundException {
+ DatanodeDetails dn1 = MockDatanodeDetails.randomDatanodeDetails();
+ nodeManager.register(dn1,
+ new NodeStatus(HddsProtos.NodeOperationalState.DECOMMISSIONING,
+ HddsProtos.NodeState.HEALTHY));
+ nodeManager.setContainers(dn1, generateContainers(3));
+ DatanodeAdminMonitorTestUtil
+ .mockGetContainerReplicaCount(
+ repManager,
+ true,
+ HddsProtos.LifeCycleState.CLOSED,
+ DECOMMISSIONED,
+ IN_SERVICE,
+ IN_SERVICE);
+
+ long beforeTime = System.currentTimeMillis();
+ monitor.startMonitoring(dn1);
+ monitor.run();
+ long afterTime = System.currentTimeMillis();
+
+ assertEquals(1, monitor.getTrackedNodeCount());
+ long monitoredTime = monitor.getSingleTrackedNode(dn1.getIpAddress())
+ .getStartTime();
+ assertTrue(monitoredTime >= beforeTime);
+ assertTrue(monitoredTime <= afterTime);
+ }
+
@Test
public void testMaintenanceWaitsForMaintenanceToComplete()
throws NodeNotFoundException {
@@ -616,7 +641,7 @@ public class TestDatanodeAdminMonitor {
monitor.startMonitoring(dn1);
monitor.run();
assertEquals(1, monitor.getTrackedNodeCount());
- DatanodeDetails node = getFirstTrackedNode();
+ DatanodeDetails node = getFirstTrackedNode().getDatanodeDetails();
assertTrue(nodeManager.getNodeStatus(dn1).isInMaintenance());
// Running the monitor again causes the node to remain in maintenance
@@ -646,7 +671,7 @@ public class TestDatanodeAdminMonitor {
// Add the node to the monitor
monitor.startMonitoring(dn1);
monitor.run();
- DatanodeDetails node = getFirstTrackedNode();
+ DatanodeDetails node = getFirstTrackedNode().getDatanodeDetails();
assertEquals(1, monitor.getTrackedNodeCount());
assertTrue(nodeManager.getNodeStatus(dn1).isEnteringMaintenance());
@@ -684,7 +709,7 @@ public class TestDatanodeAdminMonitor {
monitor.startMonitoring(dn1);
monitor.run();
assertEquals(1, monitor.getTrackedNodeCount());
- DatanodeDetails node = getFirstTrackedNode();
+ DatanodeDetails node = getFirstTrackedNode().getDatanodeDetails();
assertTrue(nodeManager.getNodeStatus(dn1).isEnteringMaintenance());
nodeManager.setNodeOperationalState(node,
@@ -708,7 +733,6 @@ public class TestDatanodeAdminMonitor {
monitor.startMonitoring(dn1);
monitor.run();
assertEquals(1, monitor.getTrackedNodeCount());
- DatanodeDetails node = getFirstTrackedNode();
assertTrue(nodeManager.getNodeStatus(dn1).isInMaintenance());
// Set the node dead and ensure the workflow does not end
@@ -735,7 +759,6 @@ public class TestDatanodeAdminMonitor {
monitor.startMonitoring(dn1);
monitor.run();
assertEquals(1, monitor.getTrackedNodeCount());
- DatanodeDetails node = getFirstTrackedNode();
assertTrue(nodeManager.getNodeStatus(dn1).isInMaintenance());
// Now cancel the node and run the monitor, the node should be IN_SERVICE
@@ -765,8 +788,9 @@ public class TestDatanodeAdminMonitor {
* the monitor.
* @return DatanodeAdminNodeDetails for the first tracked node found.
*/
- private DatanodeDetails getFirstTrackedNode() {
+ private DatanodeAdminMonitorImpl.TrackedNode getFirstTrackedNode() {
return
- monitor.getTrackedNodes().toArray(new DatanodeDetails[0])[0];
+ monitor.getTrackedNodes().toArray(new
+ DatanodeAdminMonitorImpl.TrackedNode[0])[0];
}
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionMetrics.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionMetrics.java
index 7a6a888780..598e5e48a9 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionMetrics.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionMetrics.java
@@ -344,4 +344,26 @@ public class TestNodeDecommissionMetrics {
Assertions.assertEquals(3,
metrics.getPipelinesWaitingToCloseTotal());
}
+
+ @Test
+ public void testDecommMonitorStartTimeForHost() {
+ // test metric aggregation over several datanodes
+ DatanodeDetails dn1 = MockDatanodeDetails.randomDatanodeDetails();
+
+ nodeManager.register(dn1,
+ new NodeStatus(DECOMMISSIONING,
+ HddsProtos.NodeState.HEALTHY));
+
+ nodeManager.setPipelines(dn1, 2);
+
+ long before = System.currentTimeMillis();
+ monitor.startMonitoring(dn1);
+ long after = System.currentTimeMillis();
+
+ monitor.run();
+ long startTime = monitor.getSingleTrackedNode(dn1.getIpAddress())
+ .getStartTime();
+ Assertions.assertTrue(before <= startTime);
+ Assertions.assertTrue(after >= startTime);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]