priyeshkaratha commented on code in PR #9413: URL: https://github.com/apache/ozone/pull/9413#discussion_r2621639366
########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.hdds.server.http.HttpConfig; +import org.apache.hadoop.ozone.recon.MetricsServiceProviderFactory; +import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; +import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics; +import org.apache.hadoop.ozone.recon.scm.ReconNodeManager; +import org.apache.hadoop.ozone.recon.tasks.DataNodeMetricsCollectionTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service for collecting and managing DataNode pending deletion metrics. + * Collects metrics asynchronously from all datanodes and provides aggregated results. + */ +@Singleton +public class DataNodeMetricsService { + + private static final Logger LOG = LoggerFactory.getLogger(DataNodeMetricsService.class); + private static final int MAX_POOL_SIZE = 500; + private static final int KEEP_ALIVE_TIME = 60; + private static final int QUEUE_CAPACITY = 500; + private static final int POLL_INTERVAL_MS = 200; + private static final int PER_NODE_TIMEOUT_MS = 120000; + + private final ThreadPoolExecutor executorService; + private final ReconNodeManager reconNodeManager; + private final boolean httpsEnabled; + private final int minimumApiDelayMs; + private final MetricsServiceProviderFactory metricsServiceProviderFactory; + + private MetricCollectionStatus currentStatus = MetricCollectionStatus.NOT_STARTED; + private List<DatanodePendingDeletionMetrics> pendingDeletionList; + private Long totalPendingDeletion = 0L; + private int totalNodesQueried; + private int totalNodesFailed; + private long lastCollectionEndTime; + + @Inject + public DataNodeMetricsService( + OzoneStorageContainerManager reconSCM, + OzoneConfiguration config, + MetricsServiceProviderFactory metricsServiceProviderFactory) { + + this.reconNodeManager = (ReconNodeManager) reconSCM.getScmNodeManager(); + this.httpsEnabled = HttpConfig.getHttpPolicy(config).isHttpsEnabled(); + this.minimumApiDelayMs = (int) config.getTimeDuration( + OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY, + OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT, + TimeUnit.MILLISECONDS); + this.metricsServiceProviderFactory = metricsServiceProviderFactory; + this.lastCollectionEndTime = 0; + + int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; + this.executorService = new ThreadPoolExecutor( + corePoolSize, MAX_POOL_SIZE, + KEEP_ALIVE_TIME, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(QUEUE_CAPACITY), + new ThreadFactoryBuilder() + .setNameFormat("DataNodeMetricsCollector-%d") + .build()); + } + + /** + * Starts the metrics collection task if not already running and rate limit allows. + */ + public synchronized void startTask() { + // Check if already running + if (currentStatus == MetricCollectionStatus.IN_PROGRESS) { + LOG.warn("Metrics collection already in progress, skipping"); + return; + } + + // Check rate limit + if (System.currentTimeMillis() - lastCollectionEndTime < minimumApiDelayMs) { + LOG.info("Rate limit active, skipping collection (delay: {}ms)", minimumApiDelayMs); + return; + } + + Set<DatanodeDetails> nodes = reconNodeManager.getNodeStats().keySet(); + if (nodes.isEmpty()) { + LOG.warn("No datanodes found to query"); + resetState(); + currentStatus = MetricCollectionStatus.SUCCEEDED; + return; + } + + // Set status immediately before starting async collection + currentStatus = MetricCollectionStatus.IN_PROGRESS; + LOG.info("Starting metrics collection for {} datanodes", nodes.size()); + + // Run collection asynchronously so status can be queried + CompletableFuture.runAsync(() -> collectMetrics(nodes), executorService) + .exceptionally(throwable -> { + LOG.error("Metrics collection failed", throwable); + synchronized (this) { + currentStatus = MetricCollectionStatus.SUCCEEDED; + } + return null; + }); + } + + /** + * Collects metrics from all datanodes. Processes completed tasks first, waits for all. + */ + private void collectMetrics(Set<DatanodeDetails> nodes) { + // Initialize state + List<DatanodePendingDeletionMetrics> results = new ArrayList<>(nodes.size()); + long totalPending = 0L; + int failed = 0; + + // Submit all collection tasks + Map<DatanodePendingDeletionMetrics, Future<DatanodePendingDeletionMetrics>> futures = new HashMap<>(); + Map<DatanodePendingDeletionMetrics, Long> submissionTimes = new HashMap<>(); + + long submissionTime = System.currentTimeMillis(); + for (DatanodeDetails node : nodes) { + DataNodeMetricsCollectionTask task = new DataNodeMetricsCollectionTask( + node, httpsEnabled, metricsServiceProviderFactory); + DatanodePendingDeletionMetrics key = new DatanodePendingDeletionMetrics( + node.getHostName(), node.getUuidString(), -1L); + futures.put(key, executorService.submit(task)); + submissionTimes.put(key, submissionTime); + } + + int totalQueried = futures.size(); + LOG.debug("Submitted {} collection tasks", totalQueried); + + // Poll with timeout enforcement + while (!futures.isEmpty()) { + long currentTime = System.currentTimeMillis(); + Iterator<Map.Entry<DatanodePendingDeletionMetrics, Future<DatanodePendingDeletionMetrics>>> + iterator = futures.entrySet().iterator(); + + boolean processedAny = false; + while (iterator.hasNext()) { + Map.Entry<DatanodePendingDeletionMetrics, Future<DatanodePendingDeletionMetrics>> entry = + iterator.next(); + DatanodePendingDeletionMetrics key = entry.getKey(); + Future<DatanodePendingDeletionMetrics> future = entry.getValue(); + + // Check if this task exceeded timeout + long elapsedTime = currentTime - submissionTimes.get(key); + if (elapsedTime > PER_NODE_TIMEOUT_MS && !future.isDone()) { + LOG.warn("Task for datanode {} [{}] timed out after {}ms", + key.getHostName(), key.getDatanodeUuid(), elapsedTime); + future.cancel(true); // Interrupt the task + failed++; + results.add(key); // Add with -1 (failed) + iterator.remove(); + processedAny = true; + continue; + } + + // Check if this task is done + if (future.isDone()) { + try { + DatanodePendingDeletionMetrics result = future.get(); + if (result.getPendingBlockSize() < 0) { + failed++; + } else { + totalPending += result.getPendingBlockSize(); + } + results.add(result); + LOG.debug("Processed result from {}", key.getHostName()); + } catch (ExecutionException | InterruptedException e) { + String errorType = e instanceof InterruptedException ? "interrupted" : "execution failed"; + LOG.error("Task {} for datanode {} [{}]: {}", + errorType, key.getHostName(), key.getDatanodeUuid(), e.getMessage()); + failed++; + results.add(key); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + break; + } + } + iterator.remove(); + processedAny = true; + } + } + + // Sleep before next poll only if there are remaining futures + if (!futures.isEmpty() && !processedAny) { + try { + Thread.sleep(POLL_INTERVAL_MS); + } catch (InterruptedException e) { + LOG.warn("Collection polling interrupted"); + Thread.currentThread().interrupt(); + break; + } + } + } + + // Update shared state atomically + synchronized (this) { + pendingDeletionList = results; + totalPendingDeletion = totalPending; + totalNodesQueried = totalQueried; + totalNodesFailed = failed; + currentStatus = MetricCollectionStatus.SUCCEEDED; + lastCollectionEndTime = System.currentTimeMillis(); + } + + LOG.info("Metrics collection completed. Queried: {}, Failed: {}", totalQueried, failed); + } + + /** + * Resets the collection state. + */ + private void resetState() { + pendingDeletionList = new ArrayList<>(); + totalPendingDeletion = 0L; + totalNodesQueried = 0; + totalNodesFailed = 0; + } + + public synchronized DataNodeMetricsServiceResponse getCollectedMetrics() { + if (currentStatus != MetricCollectionStatus.SUCCEEDED) { + return DataNodeMetricsServiceResponse.newBuilder() + .setStatus(currentStatus) + .build(); + } + return DataNodeMetricsServiceResponse.newBuilder() + .setStatus(currentStatus) + .setPendingDeletion(pendingDeletionList) + .setTotalPendingDeletion(totalPendingDeletion) + .setTotalNodesQueried(totalNodesQueried) + .setTotalNodeQueryFailures(totalNodesFailed) + .build(); + } + + public MetricCollectionStatus getTaskStatus() { + return currentStatus; + } + + @PreDestroy Review Comment: The method annotated with @PreDestroy is called by the container to signal that the instance is about to be destroyed. This is the ideal place to perform cleanup operations such as closing database connections, stopping background threads, or releasing file handles. So here we are stopping threadpool. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
