This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 8eaca4c3bf2 HDDS-14010. [Recon] Endpoint to retrieve pending deletion
metrics from DataNodes, SCM, and OM. (#9413)
8eaca4c3bf2 is described below
commit 8eaca4c3bf2972a7e9f84062ff4c1d49fa2ea6db
Author: Priyesh Karatha <[email protected]>
AuthorDate: Thu Dec 18 18:39:58 2025 +0530
HDDS-14010. [Recon] Endpoint to retrieve pending deletion metrics from
DataNodes, SCM, and OM. (#9413)
---
.../common/src/main/resources/ozone-default.xml | 18 ++
.../dist/src/main/compose/ozone/docker-config | 2 +-
.../recon/TestStorageDistributionEndpoint.java | 190 +++++++++--
.../ozone/recon/MetricsServiceProviderFactory.java | 30 +-
.../hadoop/ozone/recon/ReconServerConfigKeys.java | 12 +-
.../org/apache/hadoop/ozone/recon/ReconUtils.java | 30 ++
.../ozone/recon/api/DataNodeMetricsService.java | 353 +++++++++++++++++++++
.../ozone/recon/api/PendingDeletionEndpoint.java | 110 +++++++
.../ozone/recon/api/ReconGlobalMetricsService.java | 31 +-
.../api/types/DataNodeMetricsServiceResponse.java | 130 ++++++++
.../api/types/DatanodePendingDeletionMetrics.java | 59 ++++
.../ozone/recon/api/types/ScmPendingDeletion.java | 58 ++++
.../ozone/recon/spi/MetricsServiceProvider.java | 5 +-
.../recon/spi/impl/JmxServiceProviderImpl.java | 117 +++++++
.../spi/impl/PrometheusServiceProviderImpl.java | 39 +--
.../recon/tasks/DataNodeMetricsCollectionTask.java | 87 +++++
16 files changed, 1194 insertions(+), 77 deletions(-)
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index aed31414abf..7a67f27cc49 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3504,6 +3504,24 @@
If the buffer overflows, task reinitialization will be triggered.
</description>
</property>
+ <property>
+ <name>ozone.recon.dn.metrics.collection.minimum.api.delay</name>
+ <value>30s</value>
+ <tag>OZONE, RECON, DN</tag>
+ <description>
+ Minimum delay in API to start a new task for Jmx collection.
+ It behaves like a rate limiter to avoid unnecessary task creation.
+ </description>
+ </property>
+ <property>
+ <name>ozone.recon.dn.metrics.collection.timeout</name>
+ <value>10m</value>
+ <tag>OZONE, RECON, DN</tag>
+ <description>
+ Maximum time taken for the api to complete.
+ If it exceeds pending tasks will be cancelled.
+ </description>
+ </property>
<property>
<name>ozone.scm.datanode.admin.monitor.interval</name>
<value>30s</value>
diff --git a/hadoop-ozone/dist/src/main/compose/ozone/docker-config
b/hadoop-ozone/dist/src/main/compose/ozone/docker-config
index 738495873bf..0631cba616d 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone/docker-config
@@ -58,7 +58,7 @@ OZONE-SITE.XML_ozone.http.basedir=/tmp/ozone_http
OZONE-SITE.XML_hdds.container.ratis.datastream.enabled=true
OZONE-SITE.XML_ozone.fs.hsync.enabled=true
-
+OZONE-SITE.XML_ozone.recon.dn.metrics.collection.minimum.api.delay=5s
OZONE_CONF_DIR=/etc/hadoop
OZONE_LOG_DIR=/var/log/hadoop
diff --git
a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java
b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java
index c43958fe154..5ac779d6596 100644
---
a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java
+++
b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java
@@ -22,19 +22,23 @@
import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT;
import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
import static org.apache.hadoop.hdds.client.ReplicationType.RATIS;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HA_RATIS_SNAPSHOT_GAP;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
-import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
import static
org.apache.hadoop.ozone.recon.TestReconEndpointUtil.getReconWebAddress;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -46,6 +50,8 @@
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfig;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.HddsDatanodeService;
@@ -66,6 +72,9 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.recon.api.DataNodeMetricsService;
+import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse;
+import org.apache.hadoop.ozone.recon.api.types.ScmPendingDeletion;
import
org.apache.hadoop.ozone.recon.api.types.StorageCapacityDistributionResponse;
import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
import org.apache.ozone.test.GenericTestUtils;
@@ -100,6 +109,7 @@ public class TestStorageDistributionEndpoint {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final String STORAGE_DIST_ENDPOINT =
"/api/v1/storageDistribution";
+ private static final String PENDING_DELETION_ENDPOINT =
"/api/v1/pendingDeletion";
static List<Arguments> replicationConfigs() {
return Collections.singletonList(
@@ -110,17 +120,14 @@ static List<Arguments> replicationConfigs() {
@BeforeAll
public static void setup() throws Exception {
conf = new OzoneConfiguration();
- conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
- TimeUnit.MILLISECONDS);
- conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, 100,
- TimeUnit.MILLISECONDS);
- conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
- 100, TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(OZONE_DIR_DELETING_SERVICE_INTERVAL, 100,
TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100,
TimeUnit.MILLISECONDS);
conf.setLong(OZONE_SCM_HA_RATIS_SNAPSHOT_GAP, 1L);
- conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 50,
- TimeUnit.MILLISECONDS);
- conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
- TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 50, TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL, 500,
TimeUnit.MILLISECONDS);
+
conf.set(ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY,
"5s");
// Enhanced SCM configuration for faster block deletion processing
ScmConfig scmConfig = conf.getObject(ScmConfig.class);
@@ -129,18 +136,9 @@ public static void setup() throws Exception {
conf.set(HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, "0s");
// Enhanced DataNode configuration to move pending deletion from SCM to DN
faster
- DatanodeConfiguration dnConf =
- conf.getObject(DatanodeConfiguration.class);
- dnConf.setBlockDeletionInterval(Duration.ofMillis(100));
- // Increase block delete queue limit to allow more queued commands on DN
- dnConf.setBlockDeleteQueueLimit(50);
- // Reduce the interval for delete command worker processing
- dnConf.setBlockDeleteCommandWorkerInterval(Duration.ofMillis(100));
- // Increase blocks deleted per interval to speed up deletion
- dnConf.setBlockDeletionLimit(5000);
+ DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
+ dnConf.setBlockDeletionInterval(Duration.ofMillis(30000));
conf.setFromObject(dnConf);
- // Increase DN delete threads for faster parallel processing
- conf.setInt("ozone.datanode.block.delete.threads.max", 10);
recon = new ReconService(conf);
cluster = MiniOzoneCluster.newBuilder(conf)
@@ -190,19 +188,133 @@ public void
testStorageDistributionEndpoint(ReplicationConfig replicationConfig)
}
}
waitForKeysCreated(replicationConfig);
- Thread.sleep(10000);
- StringBuilder urlBuilder = new StringBuilder();
- urlBuilder.append(getReconWebAddress(conf))
- .append(STORAGE_DIST_ENDPOINT);
- String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder);
- StorageCapacityDistributionResponse storageResponse =
- MAPPER.readValue(response, StorageCapacityDistributionResponse.class);
-
- assertEquals(20, storageResponse.getGlobalNamespace().getTotalKeys());
- assertEquals(60, storageResponse.getGlobalNamespace().getTotalUsedSpace());
- assertEquals(0, storageResponse.getUsedSpaceBreakDown().getOpenKeyBytes());
- assertEquals(60,
storageResponse.getUsedSpaceBreakDown().getCommittedKeyBytes());
- assertEquals(3, storageResponse.getDataNodeUsage().size());
+ GenericTestUtils.waitFor(this::verifyStorageDistributionAfterKeyCreation,
1000, 30000);
+ closeAllContainers();
+ fs.delete(dir1, true);
+ GenericTestUtils.waitFor(this::verifyPendingDeletionAfterKeyDeletionOm,
1000, 30000);
+ GenericTestUtils.waitFor(this::verifyPendingDeletionAfterKeyDeletionScm,
2000, 30000);
+ GenericTestUtils.waitFor(() ->
+
Objects.requireNonNull(scm.getClientProtocolServer().getDeletedBlockSummary()).getTotalBlockCount()
== 0,
+ 1000, 30000);
+ GenericTestUtils.waitFor(this::verifyPendingDeletionAfterKeyDeletionDn,
2000, 60000);
+ GenericTestUtils.waitFor(this::verifyPendingDeletionClearsAtDn, 2000,
60000);
+ cluster.getHddsDatanodes().get(0).stop();
+
GenericTestUtils.waitFor(this::verifyPendingDeletionAfterKeyDeletionOnDnFailure,
2000, 60000);
+ }
+
+ private boolean verifyStorageDistributionAfterKeyCreation() {
+ try {
+ StringBuilder urlBuilder = new StringBuilder();
+
urlBuilder.append(getReconWebAddress(conf)).append(STORAGE_DIST_ENDPOINT);
+ String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder);
+ StorageCapacityDistributionResponse storageResponse =
+ MAPPER.readValue(response,
StorageCapacityDistributionResponse.class);
+
+ assertEquals(20, storageResponse.getGlobalNamespace().getTotalKeys());
+ assertEquals(60,
storageResponse.getGlobalNamespace().getTotalUsedSpace());
+ assertEquals(0,
storageResponse.getUsedSpaceBreakDown().getOpenKeyBytes());
+ assertEquals(60,
storageResponse.getUsedSpaceBreakDown().getCommittedKeyBytes());
+ assertEquals(3, storageResponse.getDataNodeUsage().size());
+
+ return true;
+ } catch (Exception e) {
+ LOG.debug("Waiting for storage distribution assertions to pass", e);
+ return false;
+ }
+ }
+
+ private boolean verifyPendingDeletionAfterKeyDeletionOm() {
+ try {
+ syncDataFromOM();
+ StringBuilder urlBuilder = new StringBuilder();
+
urlBuilder.append(getReconWebAddress(conf)).append(PENDING_DELETION_ENDPOINT).append("?component=om");
+ String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder);
+ Map<String, Number> pendingDeletionMap = MAPPER.readValue(response,
Map.class);
+ assertEquals(30L, pendingDeletionMap.get("totalSize").longValue());
+ assertEquals(30L,
pendingDeletionMap.get("pendingDirectorySize").longValue() +
+ pendingDeletionMap.get("pendingKeySize").longValue());
+ return true;
+ } catch (Exception e) {
+ LOG.debug("Waiting for storage distribution assertions to pass", e);
+ return false;
+ }
+ }
+
+ private boolean verifyPendingDeletionAfterKeyDeletionScm() {
+ try {
+ StringBuilder urlBuilder = new StringBuilder();
+
urlBuilder.append(getReconWebAddress(conf)).append(PENDING_DELETION_ENDPOINT).append("?component=scm");
+ String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder);
+ ScmPendingDeletion pendingDeletion = MAPPER.readValue(response,
ScmPendingDeletion.class);
+ assertEquals(30, pendingDeletion.getTotalReplicatedBlockSize());
+ assertEquals(10, pendingDeletion.getTotalBlocksize());
+ assertEquals(10, pendingDeletion.getTotalBlocksCount());
+ return true;
+ } catch (Throwable e) {
+ LOG.debug("Waiting for storage distribution assertions to pass", e);
+ return false;
+ }
+ }
+
+ private boolean verifyPendingDeletionAfterKeyDeletionDn() {
+ try {
+ scm.getScmHAManager().asSCMHADBTransactionBuffer().flush();
+ StringBuilder urlBuilder = new StringBuilder();
+
urlBuilder.append(getReconWebAddress(conf)).append(PENDING_DELETION_ENDPOINT).append("?component=dn");
+ String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder);
+ DataNodeMetricsServiceResponse pendingDeletion =
MAPPER.readValue(response, DataNodeMetricsServiceResponse.class);
+ assertNotNull(pendingDeletion);
+ assertEquals(30, pendingDeletion.getTotalPendingDeletionSize());
+ assertEquals(DataNodeMetricsService.MetricCollectionStatus.FINISHED,
pendingDeletion.getStatus());
+ assertEquals(pendingDeletion.getTotalNodesQueried(),
pendingDeletion.getPendingDeletionPerDataNode().size());
+ assertEquals(0, pendingDeletion.getTotalNodeQueryFailures());
+ pendingDeletion.getPendingDeletionPerDataNode().forEach(dn -> {
+ assertEquals(10, dn.getPendingBlockSize());
+ });
+ return true;
+ } catch (Throwable e) {
+ LOG.debug("Waiting for storage distribution assertions to pass", e);
+ return false;
+ }
+ }
+
+ private boolean verifyPendingDeletionClearsAtDn() {
+ try {
+ scm.getScmHAManager().asSCMHADBTransactionBuffer().flush();
+ StringBuilder urlBuilder = new StringBuilder();
+
urlBuilder.append(getReconWebAddress(conf)).append(PENDING_DELETION_ENDPOINT).append("?component=dn");
+ String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder);
+ DataNodeMetricsServiceResponse pendingDeletion =
MAPPER.readValue(response, DataNodeMetricsServiceResponse.class);
+ assertNotNull(pendingDeletion);
+ assertEquals(0, pendingDeletion.getTotalPendingDeletionSize());
+ assertEquals(DataNodeMetricsService.MetricCollectionStatus.FINISHED,
pendingDeletion.getStatus());
+ assertEquals(pendingDeletion.getTotalNodesQueried(),
pendingDeletion.getPendingDeletionPerDataNode().size());
+ assertEquals(0, pendingDeletion.getTotalNodeQueryFailures());
+ pendingDeletion.getPendingDeletionPerDataNode().forEach(dn -> {
+ assertEquals(0, dn.getPendingBlockSize());
+ });
+ return true;
+ } catch (Throwable e) {
+ LOG.debug("Waiting for storage distribution assertions to pass", e);
+ return false;
+ }
+ }
+
+ private boolean verifyPendingDeletionAfterKeyDeletionOnDnFailure() {
+ try {
+ StringBuilder urlBuilder = new StringBuilder();
+
urlBuilder.append(getReconWebAddress(conf)).append(PENDING_DELETION_ENDPOINT).append("?component=dn");
+ String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder);
+ DataNodeMetricsServiceResponse pendingDeletion =
MAPPER.readValue(response, DataNodeMetricsServiceResponse.class);
+ assertNotNull(pendingDeletion);
+ assertEquals(1, pendingDeletion.getTotalNodeQueryFailures());
+ assertTrue(pendingDeletion.getPendingDeletionPerDataNode()
+ .stream()
+ .anyMatch(dn -> dn.getPendingBlockSize() == -1));
+ return true;
+ } catch (Throwable e) {
+ return false;
+ }
}
private void verifyBlocksCreated(
@@ -286,4 +398,12 @@ public static void tear() {
cluster.shutdown();
}
}
+
+ private static void closeAllContainers() {
+ for (ContainerInfo container :
+ scm.getContainerManager().getContainers()) {
+ scm.getEventQueue().fireEvent(SCMEvents.CLOSE_CONTAINER,
+ container.containerID());
+ }
+ }
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/MetricsServiceProviderFactory.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/MetricsServiceProviderFactory.java
index d09c01ea72f..ee99ec9fa2e 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/MetricsServiceProviderFactory.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/MetricsServiceProviderFactory.java
@@ -17,12 +17,20 @@
package org.apache.hadoop.ozone.recon;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_METRICS_HTTP_CONNECTION_REQUEST_TIMEOUT;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_METRICS_HTTP_CONNECTION_REQUEST_TIMEOUT_DEFAULT;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_METRICS_HTTP_CONNECTION_TIMEOUT;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_METRICS_HTTP_CONNECTION_TIMEOUT_DEFAULT;
+
+import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.recon.ReconConfigKeys;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.ozone.recon.spi.MetricsServiceProvider;
+import org.apache.hadoop.ozone.recon.spi.impl.JmxServiceProviderImpl;
import org.apache.hadoop.ozone.recon.spi.impl.PrometheusServiceProviderImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,12 +47,23 @@ public class MetricsServiceProviderFactory {
private OzoneConfiguration configuration;
private ReconUtils reconUtils;
+ private URLConnectionFactory connectionFactory;
@Inject
public MetricsServiceProviderFactory(OzoneConfiguration configuration,
ReconUtils reconUtils) {
this.configuration = configuration;
this.reconUtils = reconUtils;
+ int connectionTimeout = (int) configuration.getTimeDuration(
+ OZONE_RECON_METRICS_HTTP_CONNECTION_TIMEOUT,
+ OZONE_RECON_METRICS_HTTP_CONNECTION_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ int connectionRequestTimeout = (int) configuration.getTimeDuration(
+ OZONE_RECON_METRICS_HTTP_CONNECTION_REQUEST_TIMEOUT,
+ OZONE_RECON_METRICS_HTTP_CONNECTION_REQUEST_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ connectionFactory =
URLConnectionFactory.newDefaultURLConnectionFactory(connectionTimeout,
+ connectionRequestTimeout, configuration);
}
/**
@@ -62,11 +81,20 @@ public MetricsServiceProvider getMetricsServiceProvider() {
String.format("Choosing Prometheus as Metrics service provider " +
"with configured endpoint: %s", prometheusEndpoint));
}
- return new PrometheusServiceProviderImpl(configuration, reconUtils);
+ return new PrometheusServiceProviderImpl(configuration, reconUtils,
connectionFactory);
}
return null;
}
+ /**
+ * Returns the configured MetricsServiceProvider implementation for Jmx.
+ * @param endpoint
+ * @return MetricsServiceProvider instance for Jmx
+ */
+ public MetricsServiceProvider getJmxMetricsServiceProvider(String endpoint) {
+ return new JmxServiceProviderImpl(reconUtils, endpoint, connectionFactory);
+ }
+
/**
* Returns the Prometheus endpoint if configured. Otherwise returns null.
*
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
index ba357f4ba14..dc75200214f 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
@@ -123,14 +123,14 @@ public final class ReconServerConfigKeys {
public static final String
OZONE_RECON_METRICS_HTTP_CONNECTION_TIMEOUT_DEFAULT =
- "10s";
+ "30s";
public static final String
OZONE_RECON_METRICS_HTTP_CONNECTION_REQUEST_TIMEOUT =
"ozone.recon.metrics.http.connection.request.timeout";
public static final String
- OZONE_RECON_METRICS_HTTP_CONNECTION_REQUEST_TIMEOUT_DEFAULT = "10s";
+ OZONE_RECON_METRICS_HTTP_CONNECTION_REQUEST_TIMEOUT_DEFAULT = "60s";
public static final String OZONE_RECON_SCM_CONTAINER_THRESHOLD =
"ozone.recon.scm.container.threshold";
@@ -213,6 +213,14 @@ public final class ReconServerConfigKeys {
public static final int
OZONE_RECON_SCM_CLIENT_FAILOVER_MAX_RETRY_DEFAULT = 3;
+ public static final String
OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY =
+ "ozone.recon.dn.metrics.collection.minimum.api.delay";
+ public static final String
OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT = "30s";
+
+ public static final String OZONE_RECON_DN_METRICS_COLLECTION_TIMEOUT =
+ "ozone.recon.dn.metrics.collection.timeout";
+ public static final String OZONE_RECON_DN_METRICS_COLLECTION_TIMEOUT_DEFAULT
= "10m";
+
/**
* Private constructor for utility class.
*/
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
index ccc92648f11..0594169f03c 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
@@ -847,4 +847,34 @@ public static String constructObjectPathWithPrefix(long...
ids) {
}
return pathBuilder.toString();
}
+
+ public static Map<String, Object> getMetricsData(List<Map<String, Object>>
metrics, String beanName) {
+ if (metrics == null || StringUtils.isEmpty(beanName)) {
+ return null;
+ }
+ for (Map<String, Object> item :metrics) {
+ if (beanName.equals(item.get("name"))) {
+ return item;
+ }
+ }
+ return null;
+ }
+
+ public static long extractLongMetricValue(Map<String, Object> metrics,
String keyName) {
+ if (metrics == null || StringUtils.isEmpty(keyName)) {
+ return -1;
+ }
+ Object value = metrics.get(keyName);
+ if (value instanceof Number) {
+ return ((Number) value).longValue();
+ }
+ if (value instanceof String) {
+ try {
+ return Long.parseLong((String) value);
+ } catch (NumberFormatException e) {
+ log.error("Failed to parse long value for key: {} with value: {}",
keyName, value, e);
+ }
+ }
+ return -1;
+ }
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java
new file mode 100644
index 00000000000..c37e8e65ace
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api;
+
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_TIMEOUT;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_TIMEOUT_DEFAULT;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
+import org.apache.hadoop.hdds.server.http.HttpConfig;
+import org.apache.hadoop.ozone.recon.MetricsServiceProviderFactory;
+import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse;
+import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics;
+import org.apache.hadoop.ozone.recon.scm.ReconNodeManager;
+import org.apache.hadoop.ozone.recon.tasks.DataNodeMetricsCollectionTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service for collecting and managing DataNode pending deletion metrics.
+ * Collects metrics asynchronously from all datanodes and provides aggregated
results.
+ */
+@Singleton
+public class DataNodeMetricsService {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DataNodeMetricsService.class);
+ private static final int MAX_POOL_SIZE = 500;
+ private static final int KEEP_ALIVE_TIME = 5;
+ private static final int POLL_INTERVAL_MS = 200;
+
+ private final ThreadPoolExecutor executorService;
+ private final ReconNodeManager reconNodeManager;
+ private final boolean httpsEnabled;
+ private final int minimumApiDelayMs;
+ private final MetricsServiceProviderFactory metricsServiceProviderFactory;
+ private final int maximumTaskTimeout;
+ private final AtomicBoolean isRunning = new AtomicBoolean(false);
+
+ private MetricCollectionStatus currentStatus =
MetricCollectionStatus.NOT_STARTED;
+ private List<DatanodePendingDeletionMetrics> pendingDeletionList;
+ private Long totalPendingDeletion = 0L;
+ private int totalNodesQueried;
+ private int totalNodesFailed;
+ private AtomicLong lastCollectionEndTime = new AtomicLong(0L);
+
+ @Inject
+ public DataNodeMetricsService(
+ OzoneStorageContainerManager reconSCM,
+ OzoneConfiguration config,
+ MetricsServiceProviderFactory metricsServiceProviderFactory) {
+
+ this.reconNodeManager = (ReconNodeManager) reconSCM.getScmNodeManager();
+ this.httpsEnabled = HttpConfig.getHttpPolicy(config).isHttpsEnabled();
+ this.minimumApiDelayMs = (int) config.getTimeDuration(
+ OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY,
+ OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ this.maximumTaskTimeout = (int)
config.getTimeDuration(OZONE_RECON_DN_METRICS_COLLECTION_TIMEOUT,
+ OZONE_RECON_DN_METRICS_COLLECTION_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
+ this.metricsServiceProviderFactory = metricsServiceProviderFactory;
+ this.lastCollectionEndTime.set(-minimumApiDelayMs);
+ int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
+ this.executorService = new ThreadPoolExecutor(
+ corePoolSize, MAX_POOL_SIZE,
+ KEEP_ALIVE_TIME, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new ThreadFactoryBuilder()
+ .setNameFormat("DataNodeMetricsCollector-%d")
+ .build());
+ }
+
+ /**
+ * Starts the metrics collection task if not already running and rate limit
allows.
+ */
+ public void startTask() {
+ // Check if already running
+ if (!isRunning.compareAndSet(false, true)) {
+ LOG.warn("Metrics collection already in progress, skipping");
+ return;
+ }
+
+ // Check rate limit
+ if (System.currentTimeMillis() - lastCollectionEndTime.get() <
minimumApiDelayMs) {
+ LOG.debug("Rate limit active, skipping collection (delay: {}ms)",
minimumApiDelayMs);
+ isRunning.set(false);
+ return;
+ }
+
+ Set<DatanodeDetails> nodes = reconNodeManager.getNodeStats().keySet();
+ if (nodes.isEmpty()) {
+ LOG.warn("No datanodes found to query");
+ resetState();
+ currentStatus = MetricCollectionStatus.FINISHED;
+ isRunning.set(false);
+ return;
+ }
+
+ // Set status immediately before starting async collection
+ currentStatus = MetricCollectionStatus.IN_PROGRESS;
+ LOG.debug("Starting metrics collection for {} datanodes", nodes.size());
+
+ // Run a collection asynchronously so status can be queried
+ CompletableFuture.runAsync(() -> collectMetrics(nodes), executorService)
+ .exceptionally(throwable -> {
+ LOG.error("Metrics collection failed", throwable);
+ synchronized (DataNodeMetricsService.this) {
+ currentStatus = MetricCollectionStatus.FINISHED;
+ isRunning.set(false);
+ }
+ return null;
+ });
+ }
+
+ /**
+ * Collects metrics from all datanodes. Processes completed tasks first,
waits for all.
+ */
+ private void collectMetrics(Set<DatanodeDetails> nodes) {
+ try {
+ CollectionContext context = submitMetricsCollectionTasks(nodes);
+ processCollectionFutures(context);
+ updateFinalState(context);
+ } catch (Exception e) {
+ resetState();
+ currentStatus = MetricCollectionStatus.FAILED;
+ isRunning.set(false);
+ }
+ }
+
+ /**
+ * Submits metrics collection tasks for all given datanodes.
+ * @return A context object containing tracking structures for the submitted
futures.
+ */
+ private CollectionContext submitMetricsCollectionTasks(Set<DatanodeDetails>
nodes) {
+ // Initialize state
+ List<DatanodePendingDeletionMetrics> results = new
ArrayList<>(nodes.size());
+ // Submit all collection tasks
+ Map<DatanodePendingDeletionMetrics,
Future<DatanodePendingDeletionMetrics>> futures = new HashMap<>();
+
+ long submissionTime = System.currentTimeMillis();
+ for (DatanodeDetails node : nodes) {
+ DataNodeMetricsCollectionTask task = new DataNodeMetricsCollectionTask(
+ node, httpsEnabled, metricsServiceProviderFactory);
+ DatanodePendingDeletionMetrics key = new DatanodePendingDeletionMetrics(
+ node.getHostName(), node.getUuidString(), -1L); // -1 is used as
placeholder/failed status
+ futures.put(key, executorService.submit(task));
+ }
+ int totalQueried = futures.size();
+ LOG.debug("Submitted {} collection tasks", totalQueried);
+ return new CollectionContext(totalQueried, futures, submissionTime,
results);
+ }
+
+ /**
+ * Polls the submitted futures, enforcing timeouts and aggregating results
until all are complete.
+ */
+ private void processCollectionFutures(CollectionContext context) {
+ // Poll with timeout enforcement
+ while (!context.futures.isEmpty()) {
+ long currentTime = System.currentTimeMillis();
+ Iterator<Map.Entry<DatanodePendingDeletionMetrics,
Future<DatanodePendingDeletionMetrics>>>
+ iterator = context.futures.entrySet().iterator();
+ boolean processedAny = false;
+ while (iterator.hasNext()) {
+ Map.Entry<DatanodePendingDeletionMetrics,
Future<DatanodePendingDeletionMetrics>> entry =
+ iterator.next();
+ DatanodePendingDeletionMetrics key = entry.getKey();
+ Future<DatanodePendingDeletionMetrics> future = entry.getValue();
+ // Check for timeout
+ if (checkAndHandleTimeout(key, future, context, currentTime)) {
+ iterator.remove();
+ processedAny = true;
+ continue;
+ }
+ // Check for completion
+ if (future.isDone()) {
+ handleCompletedFuture(key, future, context);
+ iterator.remove();
+ processedAny = true;
+ }
+ }
+ // Sleep before the next poll only if there are remaining futures and
nothing was processed
+ if (!context.futures.isEmpty() && !processedAny) {
+ try {
+ Thread.sleep(POLL_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ LOG.warn("Collection polling interrupted");
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+ }
+
+ private boolean checkAndHandleTimeout(
+ DatanodePendingDeletionMetrics key,
Future<DatanodePendingDeletionMetrics> future,
+ CollectionContext context, long currentTime) {
+ long elapsedTime = currentTime - context.submissionTime;
+ if (elapsedTime > maximumTaskTimeout && !future.isDone()) {
+ LOG.warn("Task for datanode {} [{}] timed out after {}ms",
+ key.getHostName(), key.getDatanodeUuid(), elapsedTime);
+ future.cancel(true); // Interrupt the task
+ context.failed++;
+ context.results.add(key); // Add with -1 (failed)
+ return true;
+ }
+ return false;
+ }
+
+ private void handleCompletedFuture(
+ DatanodePendingDeletionMetrics key,
Future<DatanodePendingDeletionMetrics> future,
+ CollectionContext context) {
+ try {
+ DatanodePendingDeletionMetrics result = future.get();
+ if (result.getPendingBlockSize() < 0) {
+ context.failed++;
+ } else {
+ context.totalPending += result.getPendingBlockSize();
+ }
+ context.results.add(result);
+ LOG.debug("Processed result from {}", key.getHostName());
+ } catch (ExecutionException | InterruptedException e) {
+ String errorType = e instanceof InterruptedException ? "interrupted" :
"execution failed";
+ LOG.error("Task {} for datanode {} [{}] failed",
+ errorType, key.getHostName(), key.getDatanodeUuid(), e);
+ context.failed++;
+ context.results.add(key);
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Atomically updates the class's shared state with the results from the
collection context.
+ */
+ private void updateFinalState(CollectionContext context) {
+ // Update shared state atomically
+ synchronized (this) {
+ pendingDeletionList = context.results;
+ totalPendingDeletion = context.totalPending;
+ totalNodesQueried = context.totalQueried;
+ totalNodesFailed = context.failed;
+ currentStatus = MetricCollectionStatus.FINISHED;
+ isRunning.set(false);
+ lastCollectionEndTime.set(System.currentTimeMillis());
+ }
+
+ LOG.debug("Metrics collection completed. Queried: {}, Failed: {}",
+ context.totalQueried, context.failed);
+ }
+
+ /**
+ * Resets the collection state.
+ */
+ private void resetState() {
+ pendingDeletionList = new ArrayList<>();
+ totalPendingDeletion = 0L;
+ totalNodesQueried = 0;
+ totalNodesFailed = 0;
+ }
+
+ public DataNodeMetricsServiceResponse getCollectedMetrics() {
+ startTask();
+ if (currentStatus == MetricCollectionStatus.FINISHED) {
+ return DataNodeMetricsServiceResponse.newBuilder()
+ .setStatus(currentStatus)
+ .setPendingDeletion(pendingDeletionList)
+ .setTotalPendingDeletionSize(totalPendingDeletion)
+ .setTotalNodesQueried(totalNodesQueried)
+ .setTotalNodeQueryFailures(totalNodesFailed)
+ .build();
+ }
+ return DataNodeMetricsServiceResponse.newBuilder()
+ .setStatus(currentStatus)
+ .build();
+ }
+
+ @PreDestroy
+ public void shutdown() {
+ LOG.info("Shutting down DataNodeMetricsService");
+ executorService.shutdown();
+ try {
+ if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
+ executorService.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ executorService.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Status of metric collection task.
+ */
+ public enum MetricCollectionStatus {
+ NOT_STARTED, IN_PROGRESS, FINISHED, FAILED
+ }
+
+ private static class CollectionContext {
+ private final int totalQueried;
+ private final Map<DatanodePendingDeletionMetrics,
Future<DatanodePendingDeletionMetrics>> futures;
+ private final List<DatanodePendingDeletionMetrics> results;
+ private final long submissionTime;
+ private long totalPending = 0L;
+ private int failed = 0;
+
+ CollectionContext(
+ int totalQueried,
+ Map<DatanodePendingDeletionMetrics,
Future<DatanodePendingDeletionMetrics>> futures,
+ long submissionTime,
+ List<DatanodePendingDeletionMetrics> results) {
+ this.totalQueried = totalQueried;
+ this.futures = futures;
+ this.submissionTime = submissionTime;
+ this.results = results;
+ }
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java
new file mode 100644
index 00000000000..2fbb9c6bb8d
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api;
+
+import java.util.Map;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Response;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse;
+import org.apache.hadoop.ozone.recon.api.types.ScmPendingDeletion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * REST API endpoint that provides metrics and information related to
+ * pending deletions. It responds to requests on the "/pendingDeletion" path
+ * and produces application/json responses.
+ */
+@Path("/pendingDeletion")
+@Produces("application/json")
+@AdminOnly
+public class PendingDeletionEndpoint {
+ private static final Logger LOG =
LoggerFactory.getLogger(PendingDeletionEndpoint.class);
+ private final ReconGlobalMetricsService reconGlobalMetricsService;
+ private final DataNodeMetricsService dataNodeMetricsService;
+ private final StorageContainerLocationProtocol scmClient;
+
+ @Inject
+ public PendingDeletionEndpoint(
+ ReconGlobalMetricsService reconGlobalMetricsService,
+ DataNodeMetricsService dataNodeMetricsService,
+ StorageContainerLocationProtocol scmClient) {
+ this.reconGlobalMetricsService = reconGlobalMetricsService;
+ this.dataNodeMetricsService = dataNodeMetricsService;
+ this.scmClient = scmClient;
+ }
+
+ @GET
+ public Response getPendingDeletionByComponent(@QueryParam("component")
String component) {
+ if (component == null || component.isEmpty()) {
+ return Response.status(Response.Status.BAD_REQUEST)
+ .entity("component query parameter is required").build();
+ }
+ final String normalizedComponent = component.trim().toLowerCase();
+ switch (normalizedComponent) {
+ case "dn":
+ return handleDataNodeMetrics();
+ case "scm":
+ return handleScmPendingDeletion();
+ case "om":
+ return handleOmPendingDeletion();
+ default:
+ return Response.status(Response.Status.BAD_REQUEST)
+ .entity("component query parameter must be one of dn, scm,
om").build();
+ }
+ }
+
+ private Response handleDataNodeMetrics() {
+ DataNodeMetricsServiceResponse response =
dataNodeMetricsService.getCollectedMetrics();
+ if (response.getStatus() ==
DataNodeMetricsService.MetricCollectionStatus.FINISHED) {
+ return Response.ok(response).build();
+ } else {
+ return Response.accepted(response).build();
+ }
+ }
+
+ private Response handleScmPendingDeletion() {
+ try {
+ HddsProtos.DeletedBlocksTransactionSummary summary =
scmClient.getDeletedBlockSummary();
+ if (summary == null) {
+ return Response.noContent()
+ .build();
+ }
+ ScmPendingDeletion pendingDeletion = new ScmPendingDeletion(
+ summary.getTotalBlockSize(),
+ summary.getTotalBlockReplicatedSize(),
+ summary.getTotalBlockCount());
+ return Response.ok(pendingDeletion).build();
+ } catch (Exception e) {
+ LOG.error("Failed to get pending deletion info from SCM", e);
+ ScmPendingDeletion pendingDeletion = new ScmPendingDeletion(-1L, -1L,
-1L);
+ return Response.ok(pendingDeletion).build();
+ }
+ }
+
+ private Response handleOmPendingDeletion() {
+ Map<String, Long> pendingDeletion =
reconGlobalMetricsService.calculatePendingSizes();
+ return Response.ok(pendingDeletion).build();
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ReconGlobalMetricsService.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ReconGlobalMetricsService.java
index b08727aab40..0796fafd90b 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ReconGlobalMetricsService.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ReconGlobalMetricsService.java
@@ -219,11 +219,32 @@ public KeyInsightInfoResponse
getPendingForDeletionDirInfo(int limit, String pre
public Map<String, Long> calculatePendingSizes() {
Map<String, Long> result = new HashMap<>();
- KeyInsightInfoResponse response = getPendingForDeletionDirInfo(-1, "");
- Map<String, Long> pendingKeySize = getDeletedKeySummary();
- result.put("pendingDirectorySize", response.getReplicatedDataSize());
- result.put("pendingKeySize",
pendingKeySize.getOrDefault("totalReplicatedDataSize", 0L));
- result.put("totalSize", result.get("pendingDirectorySize") +
result.get("pendingKeySize"));
+ long pendingDirectorySize = -1L;
+ long pendingKeySizeValue = -1L;
+
+ //Getting pending deletion directory size
+ try {
+ KeyInsightInfoResponse response = getPendingForDeletionDirInfo(-1, "");
+ pendingDirectorySize = response.getReplicatedDataSize();
+ } catch (Exception ex) {
+ LOG.error("Error calculating pending directory size", ex);
+ }
+ result.put("pendingDirectorySize", pendingDirectorySize);
+
+ //Getting pending deletion key size
+ try {
+ Map<String, Long> pendingKeySizeMap = getDeletedKeySummary();
+ pendingKeySizeValue =
pendingKeySizeMap.getOrDefault("totalReplicatedDataSize", 0L);
+ } catch (Exception ex) {
+ LOG.error("Error calculating pending key size", ex);
+ }
+ result.put("pendingKeySize", pendingKeySizeValue);
+
+ if (pendingDirectorySize < 0 || pendingKeySizeValue < 0) {
+ result.put("totalSize", -1L);
+ } else {
+ result.put("totalSize", pendingDirectorySize + pendingKeySizeValue);
+ }
return result;
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java
new file mode 100644
index 00000000000..bd1284d60ee
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api.types;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+import org.apache.hadoop.ozone.recon.api.DataNodeMetricsService;
+
+/**
+ * Represents a response from the DataNodeMetricsService.
+ * This class encapsulates the result of a metrics collection task,
+ * including the collection status, total pending deletions across all data
nodes,
+ * and details about pending deletions for each data node.
+ *
+ * Instances of this class are created using the {@link Builder} class.
+ */
+public class DataNodeMetricsServiceResponse {
+ @JsonProperty("status")
+ private DataNodeMetricsService.MetricCollectionStatus status;
+ @JsonProperty("totalPendingDeletionSize")
+ private Long totalPendingDeletionSize;
+ @JsonProperty("pendingDeletionPerDataNode")
+ private List<DatanodePendingDeletionMetrics> pendingDeletionPerDataNode;
+ @JsonProperty("totalNodesQueried")
+ private int totalNodesQueried;
+ @JsonProperty("totalNodeQueriesFailed")
+ private long totalNodeQueryFailures;
+
+ public DataNodeMetricsServiceResponse(Builder builder) {
+ this.status = builder.status;
+ this.totalPendingDeletionSize = builder.totalPendingDeletionSize;
+ this.pendingDeletionPerDataNode = builder.pendingDeletion;
+ this.totalNodesQueried = builder.totalNodesQueried;
+ this.totalNodeQueryFailures = builder.totalNodeQueryFailures;
+ }
+
+ public DataNodeMetricsServiceResponse() {
+ this.status = DataNodeMetricsService.MetricCollectionStatus.NOT_STARTED;
+ this.totalPendingDeletionSize = 0L;
+ this.pendingDeletionPerDataNode = null;
+ this.totalNodesQueried = 0;
+ this.totalNodeQueryFailures = 0;
+ }
+
+ public DataNodeMetricsService.MetricCollectionStatus getStatus() {
+ return status;
+ }
+
+ public Long getTotalPendingDeletionSize() {
+ return totalPendingDeletionSize;
+ }
+
+ public List<DatanodePendingDeletionMetrics> getPendingDeletionPerDataNode() {
+ return pendingDeletionPerDataNode;
+ }
+
+ public int getTotalNodesQueried() {
+ return totalNodesQueried;
+ }
+
+ public long getTotalNodeQueryFailures() {
+ return totalNodeQueryFailures;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder class for constructing instances of {@link
DataNodeMetricsServiceResponse}.
+ * This class provides a fluent interface for setting the various properties
+ * of a DataNodeMetricsServiceResponse object before creating a new
immutable instance.
+ * The Builder is designed to be used in a staged and intuitive manner.
+ * The properties that can be configured include:
+ * - Status of the metric collection process.
+ * - Total number of blocks pending deletion across all data nodes.
+ * - Metrics related to pending deletions from individual data nodes.
+ */
+ public static final class Builder {
+ private DataNodeMetricsService.MetricCollectionStatus status;
+ private Long totalPendingDeletionSize;
+ private List<DatanodePendingDeletionMetrics> pendingDeletion;
+ private int totalNodesQueried;
+ private long totalNodeQueryFailures;
+
+ public Builder setStatus(DataNodeMetricsService.MetricCollectionStatus
status) {
+ this.status = status;
+ return this;
+ }
+
+ public Builder setTotalPendingDeletionSize(Long totalPendingDeletionSize) {
+ this.totalPendingDeletionSize = totalPendingDeletionSize;
+ return this;
+ }
+
+ public Builder setPendingDeletion(List<DatanodePendingDeletionMetrics>
pendingDeletion) {
+ this.pendingDeletion = pendingDeletion;
+ return this;
+ }
+
+ public Builder setTotalNodesQueried(int totalNodesQueried) {
+ this.totalNodesQueried = totalNodesQueried;
+ return this;
+ }
+
+ public Builder setTotalNodeQueryFailures(long totalNodeQueryFailures) {
+ this.totalNodeQueryFailures = totalNodeQueryFailures;
+ return this;
+ }
+
+ public DataNodeMetricsServiceResponse build() {
+ return new DataNodeMetricsServiceResponse(this);
+ }
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodePendingDeletionMetrics.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodePendingDeletionMetrics.java
new file mode 100644
index 00000000000..964add57309
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodePendingDeletionMetrics.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api.types;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Represents pending deletion metrics for a datanode.
+ * This class encapsulates information about blocks pending deletion on a
specific datanode.
+ */
+public class DatanodePendingDeletionMetrics {
+
+ @JsonProperty("hostName")
+ private final String hostName;
+
+ @JsonProperty("datanodeUuid")
+ private final String datanodeUuid;
+
+ @JsonProperty("pendingBlockSize")
+ private final long pendingBlockSize;
+
+ @JsonCreator
+ public DatanodePendingDeletionMetrics(
+ @JsonProperty("hostName") String hostName,
+ @JsonProperty("datanodeUuid") String datanodeUuid,
+ @JsonProperty("pendingBlockSize") long pendingBlockSize) {
+ this.hostName = hostName;
+ this.datanodeUuid = datanodeUuid;
+ this.pendingBlockSize = pendingBlockSize;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public long getPendingBlockSize() {
+ return pendingBlockSize;
+ }
+
+ public String getDatanodeUuid() {
+ return datanodeUuid;
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ScmPendingDeletion.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ScmPendingDeletion.java
new file mode 100644
index 00000000000..357189857a3
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ScmPendingDeletion.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api.types;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Represents metadata related to pending deletions in the storage container
manager (SCM).
+ * This class encapsulates information such as the total block size, the total
size of replicated blocks,
+ * and the total number of blocks awaiting deletion.
+ */
+public class ScmPendingDeletion {
+ @JsonProperty("totalBlocksize")
+ private final long totalBlocksize;
+ @JsonProperty("totalReplicatedBlockSize")
+ private final long totalReplicatedBlockSize;
+ @JsonProperty("totalBlocksCount")
+ private final long totalBlocksCount;
+
+ public ScmPendingDeletion() {
+ this.totalBlocksize = 0;
+ this.totalReplicatedBlockSize = 0;
+ this.totalBlocksCount = 0;
+ }
+
+ public ScmPendingDeletion(long size, long replicatedSize, long totalBlocks) {
+ this.totalBlocksize = size;
+ this.totalReplicatedBlockSize = replicatedSize;
+ this.totalBlocksCount = totalBlocks;
+ }
+
+ public long getTotalBlocksize() {
+ return totalBlocksize;
+ }
+
+ public long getTotalReplicatedBlockSize() {
+ return totalReplicatedBlockSize;
+ }
+
+ public long getTotalBlocksCount() {
+ return totalBlocksCount;
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/MetricsServiceProvider.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/MetricsServiceProvider.java
index 3d0aa314067..7ddba027bfe 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/MetricsServiceProvider.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/MetricsServiceProvider.java
@@ -19,6 +19,7 @@
import java.net.HttpURLConnection;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.ozone.recon.metrics.Metric;
/**
@@ -48,12 +49,12 @@ HttpURLConnection getMetricsResponse(String api, String
queryString)
List<Metric> getMetricsInstant(String queryString) throws Exception;
/**
- * Returns a list of {@link Metric} for the given ranged query.
+ * Returns a list of {@link Map<String, Object>} for the given query.
*
* @param queryString query string with metric name, start time, end time,
* step and other filters.
* @return List of Json map of metrics response.
* @throws Exception exception
*/
- List<Metric> getMetricsRanged(String queryString) throws Exception;
+ List<Map<String, Object>> getMetrics(String queryString) throws Exception;
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/JmxServiceProviderImpl.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/JmxServiceProviderImpl.java
new file mode 100644
index 00000000000..4249d215e33
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/JmxServiceProviderImpl.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.spi.impl;
+
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.inject.Singleton;
+import javax.ws.rs.core.Response;
+import org.apache.hadoop.hdds.server.JsonUtils;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
+import org.apache.hadoop.ozone.recon.ReconUtils;
+import org.apache.hadoop.ozone.recon.metrics.Metric;
+import org.apache.hadoop.ozone.recon.spi.MetricsServiceProvider;
+
+/**
+ * Implementation of the Jmx Metrics Service provider.
+ */
+@Singleton
+public class JmxServiceProviderImpl implements MetricsServiceProvider {
+
+ public static final String JMX_INSTANT_QUERY_API = "qry";
+ private URLConnectionFactory connectionFactory;
+ private final String jmxEndpoint;
+ private ReconUtils reconUtils;
+
+ public JmxServiceProviderImpl(
+ ReconUtils reconUtils,
+ String jmxEndpoint,
+ URLConnectionFactory connectionFactory) {
+ // Remove the trailing slash from endpoint url.
+ if (jmxEndpoint != null && jmxEndpoint.endsWith("/")) {
+ jmxEndpoint = jmxEndpoint.substring(0, jmxEndpoint.length() - 1);
+ }
+ this.jmxEndpoint = jmxEndpoint;
+ this.reconUtils = reconUtils;
+ this.connectionFactory = connectionFactory;
+ }
+
+ /**
+ * Returns {@link HttpURLConnection} after querying Metrics endpoint for the
+ * given metric.
+ *
+ * @param api api.
+ * @param queryString query string with metric name and other filters.
+ * @return HttpURLConnection
+ * @throws Exception exception
+ */
+ @Override
+ public HttpURLConnection getMetricsResponse(String api, String queryString)
+ throws Exception {
+ String url = String.format("%s?%s=%s", jmxEndpoint, api,
+ queryString);
+ return reconUtils.makeHttpCall(connectionFactory,
+ url, false);
+ }
+
+ @Override
+ public List<Metric> getMetricsInstant(String queryString) throws Exception {
+ return Collections.emptyList();
+ }
+
+ /**
+ * Returns a list of {@link Metric} for the given instant query.
+ *
+ * @param queryString query string with metric name and other filters.
+ * @return List of Json map of metrics response.
+ * @throws Exception exception
+ */
+ @Override
+ public List<Map<String, Object>> getMetrics(String queryString)
+ throws Exception {
+ return getMetrics(JMX_INSTANT_QUERY_API, queryString);
+ }
+
+ /**
+ * Returns a list of {@link Metric} for the given api and query string.
+ *
+ * @param api api
+ * @param queryString query string with metric name and other filters.
+ * @return List of Json map of metrics response.
+ * @throws Exception
+ */
+ private List<Map<String, Object>> getMetrics(String api, String queryString)
+ throws Exception {
+ HttpURLConnection urlConnection =
+ getMetricsResponse(api, queryString);
+ if (Response.Status.fromStatusCode(urlConnection.getResponseCode())
+ .getFamily() == Response.Status.Family.SUCCESSFUL) {
+ try (InputStream inputStream = urlConnection.getInputStream()) {
+ Map<String, Object> jsonMap =
JsonUtils.getDefaultMapper().readValue(inputStream, Map.class);
+ Object beansObj = jsonMap.get("beans");
+ if (beansObj instanceof List) {
+ return (List<Map<String, Object>>) beansObj;
+ }
+ }
+ }
+ return Collections.emptyList();
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/PrometheusServiceProviderImpl.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/PrometheusServiceProviderImpl.java
index 64c00490613..0a784c50c90 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/PrometheusServiceProviderImpl.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/PrometheusServiceProviderImpl.java
@@ -17,19 +17,14 @@
package org.apache.hadoop.ozone.recon.spi.impl;
-import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_METRICS_HTTP_CONNECTION_REQUEST_TIMEOUT;
-import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_METRICS_HTTP_CONNECTION_REQUEST_TIMEOUT_DEFAULT;
-import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_METRICS_HTTP_CONNECTION_TIMEOUT;
-import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_METRICS_HTTP_CONNECTION_TIMEOUT_DEFAULT;
-
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
import javax.inject.Singleton;
import javax.ws.rs.core.Response;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -49,7 +44,6 @@ public class PrometheusServiceProviderImpl
implements MetricsServiceProvider {
public static final String PROMETHEUS_INSTANT_QUERY_API = "query";
- public static final String PROMETHEUS_RANGED_QUERY_API = "query_range";
private static final Logger LOG =
LoggerFactory.getLogger(PrometheusServiceProviderImpl.class);
@@ -58,21 +52,12 @@ public class PrometheusServiceProviderImpl
private final String prometheusEndpoint;
private ReconUtils reconUtils;
- public PrometheusServiceProviderImpl(OzoneConfiguration configuration,
- ReconUtils reconUtils) {
-
- int connectionTimeout = (int) configuration.getTimeDuration(
- OZONE_RECON_METRICS_HTTP_CONNECTION_TIMEOUT,
- OZONE_RECON_METRICS_HTTP_CONNECTION_TIMEOUT_DEFAULT,
- TimeUnit.MILLISECONDS);
- int connectionRequestTimeout = (int) configuration.getTimeDuration(
- OZONE_RECON_METRICS_HTTP_CONNECTION_REQUEST_TIMEOUT,
- OZONE_RECON_METRICS_HTTP_CONNECTION_REQUEST_TIMEOUT_DEFAULT,
- TimeUnit.MILLISECONDS);
+ public PrometheusServiceProviderImpl(
+ OzoneConfiguration configuration,
+ ReconUtils reconUtils,
+ URLConnectionFactory connectionFactory) {
- connectionFactory =
- URLConnectionFactory.newDefaultURLConnectionFactory(connectionTimeout,
- connectionRequestTimeout, configuration);
+ this.connectionFactory = connectionFactory;
String endpoint = configuration.getTrimmed(getEndpointConfigKey());
// Remove the trailing slash from endpoint url.
@@ -123,17 +108,9 @@ public List<Metric> getMetricsInstant(String queryString)
return getMetrics(PROMETHEUS_INSTANT_QUERY_API, queryString);
}
- /**
- * Returns a list of {@link Metric} for the given ranged query.
- *
- * @param queryString query string with metric name, start time, end time,
- * step and other filters.
- * @return List of Json map of metrics response.
- * @throws Exception exception
- */
@Override
- public List<Metric> getMetricsRanged(String queryString) throws Exception {
- return getMetrics(PROMETHEUS_RANGED_QUERY_API, queryString);
+ public List<Map<String, Object>> getMetrics(String queryString) throws
Exception {
+ return Collections.emptyList();
}
/**
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java
new file mode 100644
index 00000000000..f12627a202a
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
+import org.apache.hadoop.ozone.recon.MetricsServiceProviderFactory;
+import org.apache.hadoop.ozone.recon.ReconUtils;
+import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics;
+import org.apache.hadoop.ozone.recon.spi.MetricsServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Task for collecting pending deletion metrics from a DataNode using JMX.
+ * This class implements the Callable interface and retrieves pending deletion
+ * information (e.g., total pending block bytes) from a DataNode by invoking
its
+ * JMX endpoint. The metrics are parsed and encapsulated in the
+ * {@link DatanodePendingDeletionMetrics} object.
+ */
+public class DataNodeMetricsCollectionTask implements
Callable<DatanodePendingDeletionMetrics> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DataNodeMetricsCollectionTask.class);
+ private final DatanodeDetails nodeDetails;
+ private final boolean httpsEnabled;
+ private final MetricsServiceProvider metricsServiceProvider;
+ private static final String BEAN_NAME =
"Hadoop:service=HddsDatanode,name=BlockDeletingService";
+ private static final String METRICS_KEY = "TotalPendingBlockBytes";
+
+ public DataNodeMetricsCollectionTask(
+ DatanodeDetails nodeDetails,
+ boolean httpsEnabled,
+ MetricsServiceProviderFactory factory) {
+ this.nodeDetails = nodeDetails;
+ this.httpsEnabled = httpsEnabled;
+ this.metricsServiceProvider =
factory.getJmxMetricsServiceProvider(getJmxMetricsUrl());
+ }
+
+ @Override
+ public DatanodePendingDeletionMetrics call() {
+ LOG.debug("Collecting pending deletion metrics from DataNode {}",
nodeDetails.getHostName());
+ try {
+ List<Map<String, Object>> metrics =
metricsServiceProvider.getMetrics(BEAN_NAME);
+ if (metrics == null || metrics.isEmpty()) {
+ return new DatanodePendingDeletionMetrics(
+ nodeDetails.getHostName(), nodeDetails.getUuidString(), -1L);
+ }
+ Map<String, Object> deletionMetrics = ReconUtils.getMetricsData(metrics,
BEAN_NAME);
+ long pendingBlockSize =
ReconUtils.extractLongMetricValue(deletionMetrics, METRICS_KEY);
+
+ return new DatanodePendingDeletionMetrics(
+ nodeDetails.getHostName(), nodeDetails.getUuidString(),
pendingBlockSize);
+
+ } catch (Exception e) {
+ LOG.error("Failed to collect metrics from DataNode {}",
nodeDetails.getHostName(), e);
+ return new DatanodePendingDeletionMetrics(
+ nodeDetails.getHostName(), nodeDetails.getUuidString(), -1L);
+ }
+ }
+
+ private String getJmxMetricsUrl() {
+ String protocol = httpsEnabled ? "https" : "http";
+ Name portName = httpsEnabled ? DatanodeDetails.Port.Name.HTTPS :
DatanodeDetails.Port.Name.HTTP;
+ return String.format("%s://%s:%d/jmx",
+ protocol,
+ nodeDetails.getHostName(),
+ nodeDetails.getPort(portName).getValue());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]