ChenSammi commented on code in PR #9413:
URL: https://github.com/apache/ozone/pull/9413#discussion_r2626231117


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api;
+
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_TIMEOUT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_TIMEOUT_DEFAULT;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
+import org.apache.hadoop.hdds.server.http.HttpConfig;
+import org.apache.hadoop.ozone.recon.MetricsServiceProviderFactory;
+import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse;
+import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics;
+import org.apache.hadoop.ozone.recon.scm.ReconNodeManager;
+import org.apache.hadoop.ozone.recon.tasks.DataNodeMetricsCollectionTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service for collecting and managing DataNode pending deletion metrics.
+ * Collects metrics asynchronously from all datanodes and provides aggregated 
results.
+ */
+@Singleton
+public class DataNodeMetricsService {
+  
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataNodeMetricsService.class);
+  private static final int MAX_POOL_SIZE = 500;
+  private static final int KEEP_ALIVE_TIME = 5;
+  private static final int POLL_INTERVAL_MS = 200;
+
+  private final ThreadPoolExecutor executorService;
+  private final ReconNodeManager reconNodeManager;
+  private final boolean httpsEnabled;
+  private final int minimumApiDelayMs;
+  private final MetricsServiceProviderFactory metricsServiceProviderFactory;
+  private final int maximumTaskTimeout;
+  private final AtomicBoolean isRunning = new AtomicBoolean(false);
+  
+  private MetricCollectionStatus currentStatus = 
MetricCollectionStatus.NOT_STARTED;
+  private List<DatanodePendingDeletionMetrics> pendingDeletionList;
+  private Long totalPendingDeletion = 0L;
+  private int totalNodesQueried;
+  private int totalNodesFailed;
+  private AtomicLong lastCollectionEndTime = new AtomicLong(0L);
+
+  @Inject
+  public DataNodeMetricsService(
+      OzoneStorageContainerManager reconSCM,
+      OzoneConfiguration config,
+      MetricsServiceProviderFactory metricsServiceProviderFactory) {
+    
+    this.reconNodeManager = (ReconNodeManager) reconSCM.getScmNodeManager();
+    this.httpsEnabled = HttpConfig.getHttpPolicy(config).isHttpsEnabled();
+    this.minimumApiDelayMs = (int) config.getTimeDuration(
+        OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY,
+        OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    this.maximumTaskTimeout = (int) 
config.getTimeDuration(OZONE_RECON_DN_METRICS_COLLECTION_TIMEOUT,
+        OZONE_RECON_DN_METRICS_COLLECTION_TIMEOUT_DEFAULT, 
TimeUnit.MILLISECONDS);
+    this.metricsServiceProviderFactory = metricsServiceProviderFactory;
+    this.lastCollectionEndTime.set(-minimumApiDelayMs);
+    int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
+    this.executorService = new ThreadPoolExecutor(
+        corePoolSize, MAX_POOL_SIZE,
+        KEEP_ALIVE_TIME, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(),
+        new ThreadFactoryBuilder()
+            .setNameFormat("DataNodeMetricsCollector-%d")
+            .build());
+  }
+
+  /**
+   * Starts the metrics collection task if not already running and rate limit 
allows.
+   */
+  public void startTask() {
+    // Check if already running
+    if (!isRunning.compareAndSet(false, true)) {
+      LOG.warn("Metrics collection already in progress, skipping");
+      return;
+    }
+    
+    // Check rate limit
+    if (System.currentTimeMillis() - lastCollectionEndTime.get() < 
minimumApiDelayMs) {
+      LOG.debug("Rate limit active, skipping collection (delay: {}ms)", 
minimumApiDelayMs);
+      isRunning.set(false);
+      return;
+    }
+
+    Set<DatanodeDetails> nodes = reconNodeManager.getNodeStats().keySet();
+    if (nodes.isEmpty()) {
+      LOG.warn("No datanodes found to query");
+      resetState();
+      currentStatus = MetricCollectionStatus.SUCCEEDED;
+      isRunning.set(false);
+      return;
+    }
+
+    // Set status immediately before starting async collection
+    currentStatus = MetricCollectionStatus.IN_PROGRESS;
+    LOG.debug("Starting metrics collection for {} datanodes", nodes.size());
+    
+    // Run a collection asynchronously so status can be queried
+    CompletableFuture.runAsync(() -> collectMetrics(nodes), executorService)
+        .exceptionally(throwable -> {
+          LOG.error("Metrics collection failed", throwable);
+          synchronized (DataNodeMetricsService.this) {
+            currentStatus = MetricCollectionStatus.SUCCEEDED;
+            isRunning.set(false);
+          }
+          return null;
+        });
+  }
+
+  /**
+   * Collects metrics from all datanodes. Processes completed tasks first, 
waits for all.
+   */
+  private void collectMetrics(Set<DatanodeDetails> nodes) {
+    CollectionContext context = submitMetricsCollectionTasks(nodes);
+    processCollectionFutures(context);
+    updateFinalState(context);
+  }
+
+  /**
+   * Submits metrics collection tasks for all given datanodes.
+   * @return A context object containing tracking structures for the submitted 
futures.
+   */
+  private CollectionContext submitMetricsCollectionTasks(Set<DatanodeDetails> 
nodes) {
+    // Initialize state
+    List<DatanodePendingDeletionMetrics> results = new 
ArrayList<>(nodes.size());
+    // Submit all collection tasks
+    Map<DatanodePendingDeletionMetrics, 
Future<DatanodePendingDeletionMetrics>> futures = new HashMap<>();
+    Map<DatanodePendingDeletionMetrics, Long> submissionTimes = new 
HashMap<>();
+
+    long submissionTime = System.currentTimeMillis();
+    for (DatanodeDetails node : nodes) {
+      DataNodeMetricsCollectionTask task = new DataNodeMetricsCollectionTask(
+          node, httpsEnabled, metricsServiceProviderFactory);
+      DatanodePendingDeletionMetrics key = new DatanodePendingDeletionMetrics(
+          node.getHostName(), node.getUuidString(), -1L); // -1 is used as 
placeholder/failed status
+      futures.put(key, executorService.submit(task));
+      submissionTimes.put(key, submissionTime);
+    }
+    int totalQueried = futures.size();
+    LOG.debug("Submitted {} collection tasks", totalQueried);
+    return new CollectionContext(totalQueried, futures, submissionTimes, 
results);
+  }
+
+  /**
+   * Polls the submitted futures, enforcing timeouts and aggregating results 
until all are complete.
+   */
+  private void processCollectionFutures(CollectionContext context) {
+    // Poll with timeout enforcement
+    while (!context.futures.isEmpty()) {
+      long currentTime = System.currentTimeMillis();
+      Iterator<Map.Entry<DatanodePendingDeletionMetrics, 
Future<DatanodePendingDeletionMetrics>>>
+          iterator = context.futures.entrySet().iterator();
+      boolean processedAny = false;
+      while (iterator.hasNext()) {
+        Map.Entry<DatanodePendingDeletionMetrics, 
Future<DatanodePendingDeletionMetrics>> entry =
+            iterator.next();
+        DatanodePendingDeletionMetrics key = entry.getKey();
+        Future<DatanodePendingDeletionMetrics> future = entry.getValue();
+        // Check for timeout
+        if (checkAndHandleTimeout(key, future, context, currentTime)) {
+          iterator.remove();
+          processedAny = true;
+          continue;
+        }
+        // Check for completion
+        if (future.isDone()) {
+          handleCompletedFuture(key, future, context);
+          iterator.remove();
+          processedAny = true;
+        }
+      }
+      // Sleep before the next poll only if there are remaining futures and 
nothing was processed
+      if (!context.futures.isEmpty() && !processedAny) {
+        try {
+          Thread.sleep(POLL_INTERVAL_MS);
+        } catch (InterruptedException e) {
+          LOG.warn("Collection polling interrupted");
+          Thread.currentThread().interrupt();
+          break;
+        }
+      }
+    }
+  }
+
+  private boolean checkAndHandleTimeout(
+      DatanodePendingDeletionMetrics key, 
Future<DatanodePendingDeletionMetrics> future,
+      CollectionContext context, long currentTime) {
+    long elapsedTime = currentTime - context.submissionTimes.get(key);
+    if (elapsedTime > maximumTaskTimeout && !future.isDone()) {
+      LOG.warn("Task for datanode {} [{}] timed out after {}ms",
+          key.getHostName(), key.getDatanodeUuid(), elapsedTime);
+      future.cancel(true); // Interrupt the task
+      context.failed++;
+      context.results.add(key); // Add with -1 (failed)
+      return true;
+    }
+    return false;
+  }
+
+  private void handleCompletedFuture(
+      DatanodePendingDeletionMetrics key, 
Future<DatanodePendingDeletionMetrics> future,
+      CollectionContext context) {
+    try {
+      DatanodePendingDeletionMetrics result = future.get();
+      if (result.getPendingBlockSize() < 0) {
+        context.failed++;
+      } else {
+        context.totalPending += result.getPendingBlockSize();
+      }
+      context.results.add(result);
+      LOG.debug("Processed result from {}", key.getHostName());
+    } catch (ExecutionException | InterruptedException e) {
+      String errorType = e instanceof InterruptedException ? "interrupted" : 
"execution failed";
+      LOG.error("Task {} for datanode {} [{}] failed",
+          errorType, key.getHostName(), key.getDatanodeUuid(), e);
+      context.failed++;
+      context.results.add(key);
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  /**
+   * Atomically updates the class's shared state with the results from the 
collection context.
+   */
+  private void updateFinalState(CollectionContext context) {
+    // Update shared state atomically
+    synchronized (this) {
+      pendingDeletionList = context.results;
+      totalPendingDeletion = context.totalPending;
+      totalNodesQueried = context.totalQueried;
+      totalNodesFailed = context.failed;
+      currentStatus = MetricCollectionStatus.SUCCEEDED;
+      isRunning.set(false);
+      lastCollectionEndTime.set(System.currentTimeMillis());
+    }
+
+    LOG.debug("Metrics collection completed. Queried: {}, Failed: {}",
+        context.totalQueried, context.failed);
+  }
+
+  /**
+   * Resets the collection state.
+   */
+  private void resetState() {
+    pendingDeletionList = new ArrayList<>();
+    totalPendingDeletion = 0L;
+    totalNodesQueried = 0;
+    totalNodesFailed = 0;
+  }
+
+  public DataNodeMetricsServiceResponse getCollectedMetrics() {
+    CompletableFuture.runAsync(this::startTask);

Review Comment:
   Any specific reason that use CompletableFuture.runAsync() instead of direct 
startTask call? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to