sumitagrawl commented on code in PR #9413: URL: https://github.com/apache/ozone/pull/9413#discussion_r2620178001
########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.hdds.server.http.HttpConfig; +import org.apache.hadoop.ozone.recon.MetricsServiceProviderFactory; +import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; +import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics; +import org.apache.hadoop.ozone.recon.scm.ReconNodeManager; +import org.apache.hadoop.ozone.recon.tasks.DataNodeMetricsCollectionTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service for collecting and managing DataNode pending deletion metrics. + * Collects metrics asynchronously from all datanodes and provides aggregated results. + */ +@Singleton +public class DataNodeMetricsService { + + private static final Logger LOG = LoggerFactory.getLogger(DataNodeMetricsService.class); + private static final int MAX_POOL_SIZE = 500; + private static final int KEEP_ALIVE_TIME = 60; + private static final int QUEUE_CAPACITY = 500; + private static final int POLL_INTERVAL_MS = 200; + private static final int PER_NODE_TIMEOUT_MS = 120000; + + private final ThreadPoolExecutor executorService; + private final ReconNodeManager reconNodeManager; + private final boolean httpsEnabled; + private final int minimumApiDelayMs; + private final MetricsServiceProviderFactory metricsServiceProviderFactory; + + private MetricCollectionStatus currentStatus = MetricCollectionStatus.NOT_STARTED; + private List<DatanodePendingDeletionMetrics> pendingDeletionList; + private Long totalPendingDeletion = 0L; + private int totalNodesQueried; + private int totalNodesFailed; + private long lastCollectionEndTime; + + @Inject + public DataNodeMetricsService( + OzoneStorageContainerManager reconSCM, + OzoneConfiguration config, + MetricsServiceProviderFactory metricsServiceProviderFactory) { + + this.reconNodeManager = (ReconNodeManager) reconSCM.getScmNodeManager(); + this.httpsEnabled = HttpConfig.getHttpPolicy(config).isHttpsEnabled(); + this.minimumApiDelayMs = (int) config.getTimeDuration( + OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY, + OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT, + TimeUnit.MILLISECONDS); + this.metricsServiceProviderFactory = metricsServiceProviderFactory; + this.lastCollectionEndTime = 0; + + int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; + this.executorService = new ThreadPoolExecutor( + corePoolSize, MAX_POOL_SIZE, Review Comment: We can keep core pool size to be 0, means when no recon UI, no need keep thread be alive. Keep_Alive_time to be 5 second -- as this is not mission critical operation, can be created on need basis. ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.hdds.server.http.HttpConfig; +import org.apache.hadoop.ozone.recon.MetricsServiceProviderFactory; +import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; +import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics; +import org.apache.hadoop.ozone.recon.scm.ReconNodeManager; +import org.apache.hadoop.ozone.recon.tasks.DataNodeMetricsCollectionTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service for collecting and managing DataNode pending deletion metrics. + * Collects metrics asynchronously from all datanodes and provides aggregated results. + */ +@Singleton +public class DataNodeMetricsService { + + private static final Logger LOG = LoggerFactory.getLogger(DataNodeMetricsService.class); + private static final int MAX_POOL_SIZE = 500; + private static final int KEEP_ALIVE_TIME = 60; + private static final int QUEUE_CAPACITY = 500; + private static final int POLL_INTERVAL_MS = 200; + private static final int PER_NODE_TIMEOUT_MS = 120000; + + private final ThreadPoolExecutor executorService; + private final ReconNodeManager reconNodeManager; + private final boolean httpsEnabled; + private final int minimumApiDelayMs; + private final MetricsServiceProviderFactory metricsServiceProviderFactory; + + private MetricCollectionStatus currentStatus = MetricCollectionStatus.NOT_STARTED; + private List<DatanodePendingDeletionMetrics> pendingDeletionList; + private Long totalPendingDeletion = 0L; + private int totalNodesQueried; + private int totalNodesFailed; + private long lastCollectionEndTime; + + @Inject + public DataNodeMetricsService( + OzoneStorageContainerManager reconSCM, + OzoneConfiguration config, + MetricsServiceProviderFactory metricsServiceProviderFactory) { + + this.reconNodeManager = (ReconNodeManager) reconSCM.getScmNodeManager(); + this.httpsEnabled = HttpConfig.getHttpPolicy(config).isHttpsEnabled(); + this.minimumApiDelayMs = (int) config.getTimeDuration( + OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY, + OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT, + TimeUnit.MILLISECONDS); + this.metricsServiceProviderFactory = metricsServiceProviderFactory; + this.lastCollectionEndTime = 0; + + int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; + this.executorService = new ThreadPoolExecutor( + corePoolSize, MAX_POOL_SIZE, + KEEP_ALIVE_TIME, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(QUEUE_CAPACITY), + new ThreadFactoryBuilder() + .setNameFormat("DataNodeMetricsCollector-%d") + .build()); + } + + /** + * Starts the metrics collection task if not already running and rate limit allows. + */ + public synchronized void startTask() { + // Check if already running + if (currentStatus == MetricCollectionStatus.IN_PROGRESS) { + LOG.warn("Metrics collection already in progress, skipping"); + return; + } + + // Check rate limit + if (System.currentTimeMillis() - lastCollectionEndTime < minimumApiDelayMs) { + LOG.info("Rate limit active, skipping collection (delay: {}ms)", minimumApiDelayMs); + return; + } + + Set<DatanodeDetails> nodes = reconNodeManager.getNodeStats().keySet(); + if (nodes.isEmpty()) { + LOG.warn("No datanodes found to query"); + resetState(); + currentStatus = MetricCollectionStatus.SUCCEEDED; + return; + } + + // Set status immediately before starting async collection + currentStatus = MetricCollectionStatus.IN_PROGRESS; + LOG.info("Starting metrics collection for {} datanodes", nodes.size()); Review Comment: We need not have info log as this operation can be more frequent every 30sec ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.hdds.server.http.HttpConfig; +import org.apache.hadoop.ozone.recon.MetricsServiceProviderFactory; +import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; +import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics; +import org.apache.hadoop.ozone.recon.scm.ReconNodeManager; +import org.apache.hadoop.ozone.recon.tasks.DataNodeMetricsCollectionTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service for collecting and managing DataNode pending deletion metrics. + * Collects metrics asynchronously from all datanodes and provides aggregated results. + */ +@Singleton +public class DataNodeMetricsService { + + private static final Logger LOG = LoggerFactory.getLogger(DataNodeMetricsService.class); + private static final int MAX_POOL_SIZE = 500; + private static final int KEEP_ALIVE_TIME = 60; + private static final int QUEUE_CAPACITY = 500; + private static final int POLL_INTERVAL_MS = 200; + private static final int PER_NODE_TIMEOUT_MS = 120000; + + private final ThreadPoolExecutor executorService; + private final ReconNodeManager reconNodeManager; + private final boolean httpsEnabled; + private final int minimumApiDelayMs; + private final MetricsServiceProviderFactory metricsServiceProviderFactory; + + private MetricCollectionStatus currentStatus = MetricCollectionStatus.NOT_STARTED; + private List<DatanodePendingDeletionMetrics> pendingDeletionList; + private Long totalPendingDeletion = 0L; + private int totalNodesQueried; + private int totalNodesFailed; + private long lastCollectionEndTime; + + @Inject + public DataNodeMetricsService( + OzoneStorageContainerManager reconSCM, + OzoneConfiguration config, + MetricsServiceProviderFactory metricsServiceProviderFactory) { + + this.reconNodeManager = (ReconNodeManager) reconSCM.getScmNodeManager(); + this.httpsEnabled = HttpConfig.getHttpPolicy(config).isHttpsEnabled(); + this.minimumApiDelayMs = (int) config.getTimeDuration( + OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY, + OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT, + TimeUnit.MILLISECONDS); + this.metricsServiceProviderFactory = metricsServiceProviderFactory; + this.lastCollectionEndTime = 0; + + int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; + this.executorService = new ThreadPoolExecutor( + corePoolSize, MAX_POOL_SIZE, + KEEP_ALIVE_TIME, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(QUEUE_CAPACITY), + new ThreadFactoryBuilder() + .setNameFormat("DataNodeMetricsCollector-%d") + .build()); + } + + /** + * Starts the metrics collection task if not already running and rate limit allows. + */ + public synchronized void startTask() { + // Check if already running + if (currentStatus == MetricCollectionStatus.IN_PROGRESS) { + LOG.warn("Metrics collection already in progress, skipping"); + return; + } + + // Check rate limit + if (System.currentTimeMillis() - lastCollectionEndTime < minimumApiDelayMs) { + LOG.info("Rate limit active, skipping collection (delay: {}ms)", minimumApiDelayMs); + return; + } + + Set<DatanodeDetails> nodes = reconNodeManager.getNodeStats().keySet(); + if (nodes.isEmpty()) { + LOG.warn("No datanodes found to query"); + resetState(); + currentStatus = MetricCollectionStatus.SUCCEEDED; + return; + } + + // Set status immediately before starting async collection + currentStatus = MetricCollectionStatus.IN_PROGRESS; + LOG.info("Starting metrics collection for {} datanodes", nodes.size()); + + // Run collection asynchronously so status can be queried + CompletableFuture.runAsync(() -> collectMetrics(nodes), executorService) + .exceptionally(throwable -> { + LOG.error("Metrics collection failed", throwable); + synchronized (this) { + currentStatus = MetricCollectionStatus.SUCCEEDED; + } + return null; + }); + } + + /** + * Collects metrics from all datanodes. Processes completed tasks first, waits for all. + */ + private void collectMetrics(Set<DatanodeDetails> nodes) { + // Initialize state + List<DatanodePendingDeletionMetrics> results = new ArrayList<>(nodes.size()); + long totalPending = 0L; + int failed = 0; + + // Submit all collection tasks + Map<DatanodePendingDeletionMetrics, Future<DatanodePendingDeletionMetrics>> futures = new HashMap<>(); + Map<DatanodePendingDeletionMetrics, Long> submissionTimes = new HashMap<>(); + + long submissionTime = System.currentTimeMillis(); + for (DatanodeDetails node : nodes) { + DataNodeMetricsCollectionTask task = new DataNodeMetricsCollectionTask( + node, httpsEnabled, metricsServiceProviderFactory); + DatanodePendingDeletionMetrics key = new DatanodePendingDeletionMetrics( + node.getHostName(), node.getUuidString(), -1L); + futures.put(key, executorService.submit(task)); + submissionTimes.put(key, submissionTime); + } + + int totalQueried = futures.size(); + LOG.debug("Submitted {} collection tasks", totalQueried); + + // Poll with timeout enforcement + while (!futures.isEmpty()) { + long currentTime = System.currentTimeMillis(); + Iterator<Map.Entry<DatanodePendingDeletionMetrics, Future<DatanodePendingDeletionMetrics>>> + iterator = futures.entrySet().iterator(); + + boolean processedAny = false; + while (iterator.hasNext()) { + Map.Entry<DatanodePendingDeletionMetrics, Future<DatanodePendingDeletionMetrics>> entry = + iterator.next(); + DatanodePendingDeletionMetrics key = entry.getKey(); + Future<DatanodePendingDeletionMetrics> future = entry.getValue(); + + // Check if this task exceeded timeout + long elapsedTime = currentTime - submissionTimes.get(key); + if (elapsedTime > PER_NODE_TIMEOUT_MS && !future.isDone()) { + LOG.warn("Task for datanode {} [{}] timed out after {}ms", + key.getHostName(), key.getDatanodeUuid(), elapsedTime); + future.cancel(true); // Interrupt the task + failed++; + results.add(key); // Add with -1 (failed) + iterator.remove(); + processedAny = true; + continue; + } + + // Check if this task is done + if (future.isDone()) { + try { + DatanodePendingDeletionMetrics result = future.get(); + if (result.getPendingBlockSize() < 0) { + failed++; + } else { + totalPending += result.getPendingBlockSize(); + } + results.add(result); + LOG.debug("Processed result from {}", key.getHostName()); + } catch (ExecutionException | InterruptedException e) { + String errorType = e instanceof InterruptedException ? "interrupted" : "execution failed"; + LOG.error("Task {} for datanode {} [{}]: {}", + errorType, key.getHostName(), key.getDatanodeUuid(), e.getMessage()); + failed++; + results.add(key); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + break; + } + } + iterator.remove(); + processedAny = true; + } + } + + // Sleep before next poll only if there are remaining futures + if (!futures.isEmpty() && !processedAny) { + try { + Thread.sleep(POLL_INTERVAL_MS); + } catch (InterruptedException e) { + LOG.warn("Collection polling interrupted"); + Thread.currentThread().interrupt(); + break; + } + } + } + + // Update shared state atomically + synchronized (this) { + pendingDeletionList = results; + totalPendingDeletion = totalPending; + totalNodesQueried = totalQueried; + totalNodesFailed = failed; + currentStatus = MetricCollectionStatus.SUCCEEDED; + lastCollectionEndTime = System.currentTimeMillis(); + } + + LOG.info("Metrics collection completed. Queried: {}, Failed: {}", totalQueried, failed); + } + + /** + * Resets the collection state. + */ + private void resetState() { + pendingDeletionList = new ArrayList<>(); + totalPendingDeletion = 0L; + totalNodesQueried = 0; + totalNodesFailed = 0; + } + + public synchronized DataNodeMetricsServiceResponse getCollectedMetrics() { + if (currentStatus != MetricCollectionStatus.SUCCEEDED) { + return DataNodeMetricsServiceResponse.newBuilder() + .setStatus(currentStatus) + .build(); + } + return DataNodeMetricsServiceResponse.newBuilder() + .setStatus(currentStatus) + .setPendingDeletion(pendingDeletionList) + .setTotalPendingDeletion(totalPendingDeletion) + .setTotalNodesQueried(totalNodesQueried) + .setTotalNodeQueryFailures(totalNodesFailed) + .build(); + } + + public MetricCollectionStatus getTaskStatus() { + return currentStatus; + } + + @PreDestroy Review Comment: what is the use of this Annotation ? ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.hdds.server.http.HttpConfig; +import org.apache.hadoop.ozone.recon.MetricsServiceProviderFactory; +import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; +import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics; +import org.apache.hadoop.ozone.recon.scm.ReconNodeManager; +import org.apache.hadoop.ozone.recon.tasks.DataNodeMetricsCollectionTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service for collecting and managing DataNode pending deletion metrics. + * Collects metrics asynchronously from all datanodes and provides aggregated results. + */ +@Singleton +public class DataNodeMetricsService { + + private static final Logger LOG = LoggerFactory.getLogger(DataNodeMetricsService.class); + private static final int MAX_POOL_SIZE = 500; + private static final int KEEP_ALIVE_TIME = 60; + private static final int QUEUE_CAPACITY = 500; + private static final int POLL_INTERVAL_MS = 200; + private static final int PER_NODE_TIMEOUT_MS = 120000; + + private final ThreadPoolExecutor executorService; + private final ReconNodeManager reconNodeManager; + private final boolean httpsEnabled; + private final int minimumApiDelayMs; + private final MetricsServiceProviderFactory metricsServiceProviderFactory; + + private MetricCollectionStatus currentStatus = MetricCollectionStatus.NOT_STARTED; + private List<DatanodePendingDeletionMetrics> pendingDeletionList; + private Long totalPendingDeletion = 0L; + private int totalNodesQueried; + private int totalNodesFailed; + private long lastCollectionEndTime; + + @Inject + public DataNodeMetricsService( + OzoneStorageContainerManager reconSCM, + OzoneConfiguration config, + MetricsServiceProviderFactory metricsServiceProviderFactory) { + + this.reconNodeManager = (ReconNodeManager) reconSCM.getScmNodeManager(); + this.httpsEnabled = HttpConfig.getHttpPolicy(config).isHttpsEnabled(); + this.minimumApiDelayMs = (int) config.getTimeDuration( + OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY, + OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT, + TimeUnit.MILLISECONDS); + this.metricsServiceProviderFactory = metricsServiceProviderFactory; + this.lastCollectionEndTime = 0; + + int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; + this.executorService = new ThreadPoolExecutor( + corePoolSize, MAX_POOL_SIZE, + KEEP_ALIVE_TIME, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(QUEUE_CAPACITY), + new ThreadFactoryBuilder() + .setNameFormat("DataNodeMetricsCollector-%d") + .build()); + } + + /** + * Starts the metrics collection task if not already running and rate limit allows. + */ + public synchronized void startTask() { + // Check if already running + if (currentStatus == MetricCollectionStatus.IN_PROGRESS) { + LOG.warn("Metrics collection already in progress, skipping"); + return; + } + + // Check rate limit + if (System.currentTimeMillis() - lastCollectionEndTime < minimumApiDelayMs) { + LOG.info("Rate limit active, skipping collection (delay: {}ms)", minimumApiDelayMs); + return; + } + + Set<DatanodeDetails> nodes = reconNodeManager.getNodeStats().keySet(); + if (nodes.isEmpty()) { + LOG.warn("No datanodes found to query"); + resetState(); + currentStatus = MetricCollectionStatus.SUCCEEDED; + return; + } + + // Set status immediately before starting async collection + currentStatus = MetricCollectionStatus.IN_PROGRESS; + LOG.info("Starting metrics collection for {} datanodes", nodes.size()); + + // Run collection asynchronously so status can be queried + CompletableFuture.runAsync(() -> collectMetrics(nodes), executorService) + .exceptionally(throwable -> { + LOG.error("Metrics collection failed", throwable); + synchronized (this) { + currentStatus = MetricCollectionStatus.SUCCEEDED; + } + return null; + }); + } + + /** + * Collects metrics from all datanodes. Processes completed tasks first, waits for all. + */ + private void collectMetrics(Set<DatanodeDetails> nodes) { + // Initialize state + List<DatanodePendingDeletionMetrics> results = new ArrayList<>(nodes.size()); + long totalPending = 0L; + int failed = 0; + + // Submit all collection tasks + Map<DatanodePendingDeletionMetrics, Future<DatanodePendingDeletionMetrics>> futures = new HashMap<>(); + Map<DatanodePendingDeletionMetrics, Long> submissionTimes = new HashMap<>(); + + long submissionTime = System.currentTimeMillis(); + for (DatanodeDetails node : nodes) { + DataNodeMetricsCollectionTask task = new DataNodeMetricsCollectionTask( + node, httpsEnabled, metricsServiceProviderFactory); + DatanodePendingDeletionMetrics key = new DatanodePendingDeletionMetrics( + node.getHostName(), node.getUuidString(), -1L); + futures.put(key, executorService.submit(task)); + submissionTimes.put(key, submissionTime); + } + + int totalQueried = futures.size(); + LOG.debug("Submitted {} collection tasks", totalQueried); + + // Poll with timeout enforcement + while (!futures.isEmpty()) { + long currentTime = System.currentTimeMillis(); + Iterator<Map.Entry<DatanodePendingDeletionMetrics, Future<DatanodePendingDeletionMetrics>>> + iterator = futures.entrySet().iterator(); + + boolean processedAny = false; + while (iterator.hasNext()) { + Map.Entry<DatanodePendingDeletionMetrics, Future<DatanodePendingDeletionMetrics>> entry = + iterator.next(); + DatanodePendingDeletionMetrics key = entry.getKey(); + Future<DatanodePendingDeletionMetrics> future = entry.getValue(); + + // Check if this task exceeded timeout + long elapsedTime = currentTime - submissionTimes.get(key); + if (elapsedTime > PER_NODE_TIMEOUT_MS && !future.isDone()) { Review Comment: submission time and actual execution startup time can be different. if large number of nodes and configured thread is less, this may not work ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.hdds.server.http.HttpConfig; +import org.apache.hadoop.ozone.recon.MetricsServiceProviderFactory; +import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; +import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics; +import org.apache.hadoop.ozone.recon.scm.ReconNodeManager; +import org.apache.hadoop.ozone.recon.tasks.DataNodeMetricsCollectionTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service for collecting and managing DataNode pending deletion metrics. + * Collects metrics asynchronously from all datanodes and provides aggregated results. + */ +@Singleton +public class DataNodeMetricsService { + + private static final Logger LOG = LoggerFactory.getLogger(DataNodeMetricsService.class); + private static final int MAX_POOL_SIZE = 500; + private static final int KEEP_ALIVE_TIME = 60; + private static final int QUEUE_CAPACITY = 500; + private static final int POLL_INTERVAL_MS = 200; + private static final int PER_NODE_TIMEOUT_MS = 120000; + + private final ThreadPoolExecutor executorService; + private final ReconNodeManager reconNodeManager; + private final boolean httpsEnabled; + private final int minimumApiDelayMs; + private final MetricsServiceProviderFactory metricsServiceProviderFactory; + + private MetricCollectionStatus currentStatus = MetricCollectionStatus.NOT_STARTED; + private List<DatanodePendingDeletionMetrics> pendingDeletionList; + private Long totalPendingDeletion = 0L; + private int totalNodesQueried; + private int totalNodesFailed; + private long lastCollectionEndTime; + + @Inject + public DataNodeMetricsService( + OzoneStorageContainerManager reconSCM, + OzoneConfiguration config, + MetricsServiceProviderFactory metricsServiceProviderFactory) { + + this.reconNodeManager = (ReconNodeManager) reconSCM.getScmNodeManager(); + this.httpsEnabled = HttpConfig.getHttpPolicy(config).isHttpsEnabled(); + this.minimumApiDelayMs = (int) config.getTimeDuration( + OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY, + OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT, + TimeUnit.MILLISECONDS); + this.metricsServiceProviderFactory = metricsServiceProviderFactory; + this.lastCollectionEndTime = 0; + + int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; + this.executorService = new ThreadPoolExecutor( + corePoolSize, MAX_POOL_SIZE, + KEEP_ALIVE_TIME, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(QUEUE_CAPACITY), + new ThreadFactoryBuilder() + .setNameFormat("DataNodeMetricsCollector-%d") + .build()); + } + + /** + * Starts the metrics collection task if not already running and rate limit allows. + */ + public synchronized void startTask() { + // Check if already running + if (currentStatus == MetricCollectionStatus.IN_PROGRESS) { Review Comment: we can use AtomicBoolean of compareAndSet() to ensure only one task is runing ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.hdds.server.http.HttpConfig; +import org.apache.hadoop.ozone.recon.MetricsServiceProviderFactory; +import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; +import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics; +import org.apache.hadoop.ozone.recon.scm.ReconNodeManager; +import org.apache.hadoop.ozone.recon.tasks.DataNodeMetricsCollectionTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service for collecting and managing DataNode pending deletion metrics. + * Collects metrics asynchronously from all datanodes and provides aggregated results. + */ +@Singleton +public class DataNodeMetricsService { + + private static final Logger LOG = LoggerFactory.getLogger(DataNodeMetricsService.class); + private static final int MAX_POOL_SIZE = 500; + private static final int KEEP_ALIVE_TIME = 60; + private static final int QUEUE_CAPACITY = 500; + private static final int POLL_INTERVAL_MS = 200; + private static final int PER_NODE_TIMEOUT_MS = 120000; + + private final ThreadPoolExecutor executorService; + private final ReconNodeManager reconNodeManager; + private final boolean httpsEnabled; + private final int minimumApiDelayMs; + private final MetricsServiceProviderFactory metricsServiceProviderFactory; + + private MetricCollectionStatus currentStatus = MetricCollectionStatus.NOT_STARTED; + private List<DatanodePendingDeletionMetrics> pendingDeletionList; + private Long totalPendingDeletion = 0L; + private int totalNodesQueried; + private int totalNodesFailed; + private long lastCollectionEndTime; + + @Inject + public DataNodeMetricsService( + OzoneStorageContainerManager reconSCM, + OzoneConfiguration config, + MetricsServiceProviderFactory metricsServiceProviderFactory) { + + this.reconNodeManager = (ReconNodeManager) reconSCM.getScmNodeManager(); + this.httpsEnabled = HttpConfig.getHttpPolicy(config).isHttpsEnabled(); + this.minimumApiDelayMs = (int) config.getTimeDuration( + OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY, + OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT, + TimeUnit.MILLISECONDS); + this.metricsServiceProviderFactory = metricsServiceProviderFactory; + this.lastCollectionEndTime = 0; + + int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; + this.executorService = new ThreadPoolExecutor( + corePoolSize, MAX_POOL_SIZE, + KEEP_ALIVE_TIME, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(QUEUE_CAPACITY), + new ThreadFactoryBuilder() + .setNameFormat("DataNodeMetricsCollector-%d") + .build()); + } + + /** + * Starts the metrics collection task if not already running and rate limit allows. + */ + public synchronized void startTask() { + // Check if already running + if (currentStatus == MetricCollectionStatus.IN_PROGRESS) { + LOG.warn("Metrics collection already in progress, skipping"); + return; + } + + // Check rate limit + if (System.currentTimeMillis() - lastCollectionEndTime < minimumApiDelayMs) { + LOG.info("Rate limit active, skipping collection (delay: {}ms)", minimumApiDelayMs); + return; + } + + Set<DatanodeDetails> nodes = reconNodeManager.getNodeStats().keySet(); + if (nodes.isEmpty()) { + LOG.warn("No datanodes found to query"); + resetState(); + currentStatus = MetricCollectionStatus.SUCCEEDED; + return; + } + + // Set status immediately before starting async collection + currentStatus = MetricCollectionStatus.IN_PROGRESS; + LOG.info("Starting metrics collection for {} datanodes", nodes.size()); + + // Run collection asynchronously so status can be queried + CompletableFuture.runAsync(() -> collectMetrics(nodes), executorService) + .exceptionally(throwable -> { + LOG.error("Metrics collection failed", throwable); + synchronized (this) { + currentStatus = MetricCollectionStatus.SUCCEEDED; + } + return null; + }); + } + + /** + * Collects metrics from all datanodes. Processes completed tasks first, waits for all. + */ + private void collectMetrics(Set<DatanodeDetails> nodes) { + // Initialize state + List<DatanodePendingDeletionMetrics> results = new ArrayList<>(nodes.size()); + long totalPending = 0L; + int failed = 0; + + // Submit all collection tasks + Map<DatanodePendingDeletionMetrics, Future<DatanodePendingDeletionMetrics>> futures = new HashMap<>(); + Map<DatanodePendingDeletionMetrics, Long> submissionTimes = new HashMap<>(); + + long submissionTime = System.currentTimeMillis(); + for (DatanodeDetails node : nodes) { + DataNodeMetricsCollectionTask task = new DataNodeMetricsCollectionTask( + node, httpsEnabled, metricsServiceProviderFactory); + DatanodePendingDeletionMetrics key = new DatanodePendingDeletionMetrics( + node.getHostName(), node.getUuidString(), -1L); + futures.put(key, executorService.submit(task)); Review Comment: verify if submit is blocking when queue if full? ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java: ########## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.hdds.server.http.HttpConfig; +import org.apache.hadoop.ozone.recon.MetricsServiceProviderFactory; +import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; +import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics; +import org.apache.hadoop.ozone.recon.scm.ReconNodeManager; +import org.apache.hadoop.ozone.recon.tasks.DataNodeMetricsCollectionTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service for collecting and managing DataNode pending deletion metrics. + * Collects metrics asynchronously from all datanodes and provides aggregated results. + */ +@Singleton +public class DataNodeMetricsService { + + private static final Logger LOG = LoggerFactory.getLogger(DataNodeMetricsService.class); + private static final int MAX_POOL_SIZE = 500; + private static final int KEEP_ALIVE_TIME = 60; + private static final int QUEUE_CAPACITY = 500; + private static final int POLL_INTERVAL_MS = 200; + + private final ThreadPoolExecutor executorService; + private final ReconNodeManager reconNodeManager; + private final boolean httpsEnabled; + private final int minimumApiDelayMs; + private final MetricsServiceProviderFactory metricsServiceProviderFactory; + + private MetricCollectionStatus currentStatus = MetricCollectionStatus.NOT_STARTED; + private List<DatanodePendingDeletionMetrics> pendingDeletionList; + private Long totalPendingDeletion = 0L; + private int totalNodesQueried; + private int totalNodesFailed; + private long lastCollectionEndTime; + + @Inject + public DataNodeMetricsService( + OzoneStorageContainerManager reconSCM, + OzoneConfiguration config, + MetricsServiceProviderFactory metricsServiceProviderFactory) { + + this.reconNodeManager = (ReconNodeManager) reconSCM.getScmNodeManager(); + this.httpsEnabled = HttpConfig.getHttpPolicy(config).isHttpsEnabled(); + this.minimumApiDelayMs = (int) config.getTimeDuration( + OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY, + OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT, + TimeUnit.MILLISECONDS); + this.metricsServiceProviderFactory = metricsServiceProviderFactory; + this.lastCollectionEndTime = 0; + + int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; + this.executorService = new ThreadPoolExecutor( + corePoolSize, MAX_POOL_SIZE, + KEEP_ALIVE_TIME, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(QUEUE_CAPACITY), + new ThreadFactoryBuilder() + .setNameFormat("DataNodeMetricsCollector-%d") + .build()); + } + + /** + * Starts the metrics collection task if not already running and rate limit allows. + */ + public synchronized void startTask() { + // Check if already running + if (currentStatus == MetricCollectionStatus.IN_PROGRESS) { + LOG.warn("Metrics collection already in progress, skipping"); + return; + } + + // Check rate limit + if (System.currentTimeMillis() - lastCollectionEndTime < minimumApiDelayMs) { + LOG.info("Rate limit active, skipping collection (delay: {}ms)", minimumApiDelayMs); + return; + } + + Set<DatanodeDetails> nodes = reconNodeManager.getNodeStats().keySet(); + if (nodes.isEmpty()) { + LOG.warn("No datanodes found to query"); + resetState(); + currentStatus = MetricCollectionStatus.SUCCEEDED; + return; + } + + // Set status immediately before starting async collection + currentStatus = MetricCollectionStatus.IN_PROGRESS; + LOG.info("Starting metrics collection for {} datanodes", nodes.size()); + + // Run collection asynchronously so status can be queried + CompletableFuture.runAsync(() -> collectMetrics(nodes), executorService) + .exceptionally(throwable -> { + LOG.error("Metrics collection failed", throwable); + synchronized (this) { + currentStatus = MetricCollectionStatus.SUCCEEDED; + } + return null; + }); + } + + /** + * Collects metrics from all datanodes. Processes completed tasks first, waits for all. + */ + private void collectMetrics(Set<DatanodeDetails> nodes) { + // Initialize state + List<DatanodePendingDeletionMetrics> results = new ArrayList<>(nodes.size()); + long totalPending = 0L; + int failed = 0; + + // Submit all collection tasks + Map<DatanodePendingDeletionMetrics, Future<DatanodePendingDeletionMetrics>> futures = + new HashMap<>(); + for (DatanodeDetails node : nodes) { + DataNodeMetricsCollectionTask task = new DataNodeMetricsCollectionTask( + node, httpsEnabled, metricsServiceProviderFactory); + DatanodePendingDeletionMetrics key = new DatanodePendingDeletionMetrics( + node.getHostName(), node.getUuidString(), -1L); + futures.put(key, executorService.submit(task)); + } + + int totalQueried = futures.size(); + LOG.debug("Submitted {} collection tasks", totalQueried); + + // Poll for completed tasks, process them immediately, wait for all (no timeout) + while (!futures.isEmpty()) { Review Comment: This is possible with this task is submitted multiple time. ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.hdds.server.http.HttpConfig; +import org.apache.hadoop.ozone.recon.MetricsServiceProviderFactory; +import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; +import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics; +import org.apache.hadoop.ozone.recon.scm.ReconNodeManager; +import org.apache.hadoop.ozone.recon.tasks.DataNodeMetricsCollectionTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service for collecting and managing DataNode pending deletion metrics. + * Collects metrics asynchronously from all datanodes and provides aggregated results. + */ +@Singleton +public class DataNodeMetricsService { + + private static final Logger LOG = LoggerFactory.getLogger(DataNodeMetricsService.class); + private static final int MAX_POOL_SIZE = 500; + private static final int KEEP_ALIVE_TIME = 60; + private static final int QUEUE_CAPACITY = 500; + private static final int POLL_INTERVAL_MS = 200; + private static final int PER_NODE_TIMEOUT_MS = 120000; + + private final ThreadPoolExecutor executorService; + private final ReconNodeManager reconNodeManager; + private final boolean httpsEnabled; + private final int minimumApiDelayMs; + private final MetricsServiceProviderFactory metricsServiceProviderFactory; + + private MetricCollectionStatus currentStatus = MetricCollectionStatus.NOT_STARTED; + private List<DatanodePendingDeletionMetrics> pendingDeletionList; + private Long totalPendingDeletion = 0L; + private int totalNodesQueried; + private int totalNodesFailed; + private long lastCollectionEndTime; + + @Inject + public DataNodeMetricsService( + OzoneStorageContainerManager reconSCM, + OzoneConfiguration config, + MetricsServiceProviderFactory metricsServiceProviderFactory) { + + this.reconNodeManager = (ReconNodeManager) reconSCM.getScmNodeManager(); + this.httpsEnabled = HttpConfig.getHttpPolicy(config).isHttpsEnabled(); + this.minimumApiDelayMs = (int) config.getTimeDuration( + OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY, + OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT, + TimeUnit.MILLISECONDS); + this.metricsServiceProviderFactory = metricsServiceProviderFactory; + this.lastCollectionEndTime = 0; + + int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; + this.executorService = new ThreadPoolExecutor( + corePoolSize, MAX_POOL_SIZE, + KEEP_ALIVE_TIME, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(QUEUE_CAPACITY), + new ThreadFactoryBuilder() + .setNameFormat("DataNodeMetricsCollector-%d") + .build()); + } + + /** + * Starts the metrics collection task if not already running and rate limit allows. + */ + public synchronized void startTask() { + // Check if already running + if (currentStatus == MetricCollectionStatus.IN_PROGRESS) { + LOG.warn("Metrics collection already in progress, skipping"); + return; + } + + // Check rate limit + if (System.currentTimeMillis() - lastCollectionEndTime < minimumApiDelayMs) { + LOG.info("Rate limit active, skipping collection (delay: {}ms)", minimumApiDelayMs); Review Comment: This needs debug log, as it can be more frequent on UI referesh -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
