Copilot commented on code in PR #9406:
URL: https://github.com/apache/ozone/pull/9406#discussion_r2577972822


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Metrics for tracking db.snapshots directory space usage and SST file counts.
+ * Provides both aggregate metrics and per-checkpoint-directory metrics.
+ * Metrics are updated asynchronously to avoid blocking operations.
+ */
[email protected]
+@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE)
+public final class OMSnapshotDirectoryMetrics implements MetricsSource {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class);
+  private static final String SOURCE_NAME =
+      OMSnapshotDirectoryMetrics.class.getSimpleName();
+
+  // Aggregate metrics
+  private @Metric MutableGaugeLong dbSnapshotsDirSize;
+  private @Metric MutableGaugeLong totalSstFilesCount;
+  private @Metric MutableGaugeLong numSnapshots;
+
+  private final AtomicLong lastUpdateTime = new AtomicLong(0);
+  // Change the field declaration:
+  private final AtomicReference<CompletableFuture<Void>> 
currentUpdateFutureRef =
+      new AtomicReference<>();
+  private final OMMetadataManager metadataManager;
+  private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME);
+
+  // Per-checkpoint-directory metrics storage
+  private volatile Map<String, CheckpointMetrics> checkpointMetricsMap = new 
HashMap<>();
+
+  private Timer updateTimer;
+
+  // Add start method (after unRegister method)
+  /**
+   * Starts the periodic metrics update task.
+   *
+   * @param conf OzoneConfiguration for reading update interval
+   */
+  public void start(OzoneConfiguration conf) {
+    long updateInterval = 
conf.getTimeDuration(OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL,
+        OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    updateTimer = new Timer("OMSnapshotDirectoryMetricsUpdate", true);
+    updateTimer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        updateMetricsAsync();
+      }
+    }, 0, updateInterval);
+
+    // Do initial update
+    updateMetricsAsync();

Review Comment:
   Initial metrics update is triggered twice: once synchronously in the Timer 
task schedule at line 105, and once by the timer immediately (delay=0 at line 
102). This causes duplicate computation at startup. Consider removing line 105 
and relying solely on the timer's initial execution.
   ```suggestion
       // Removed redundant initial update; timer will execute immediately.
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java:
##########
@@ -261,6 +264,37 @@ public DBCheckpointMetrics getDBCheckpointMetrics() {
     return dbCheckpointMetrics;
   }
 
+  // Add getter with lazy initialization check
+  public OMSnapshotDirectoryMetrics getSnapshotDirectoryMetrics() {
+    if (snapshotDirectoryMetrics == null) {
+      throw new IllegalStateException(
+          "SnapshotDirectoryMetrics not initialized. Call 
startSnapshotDirectoryMetrics() first.");
+    }
+    return snapshotDirectoryMetrics;
+  }
+
+  /**
+   * Starts periodic updates for snapshot directory metrics.
+   * Creates the metrics instance if it doesn't exist.
+   *
+   * @param configuration OzoneConfiguration for reading update interval
+   * @param metadataManager OMMetadataManager for accessing snapshot 
directories
+   */
+  public void startSnapshotDirectoryMetrics(OzoneConfiguration configuration,
+      OMMetadataManager metadataManager) {
+    if (snapshotDirectoryMetrics == null) {
+      snapshotDirectoryMetrics = OMSnapshotDirectoryMetrics.create("OM 
Metrics", metadataManager);
+    }
+    snapshotDirectoryMetrics.start(configuration);
+  }
+
+  // Add stop method (for cleanup)

Review Comment:
   The comment "Add stop method (for cleanup)" appears to be a development note 
and should be removed. The JavaDoc or method name should be sufficient to 
explain its purpose.
   ```suggestion
   
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java:
##########
@@ -261,6 +264,37 @@ public DBCheckpointMetrics getDBCheckpointMetrics() {
     return dbCheckpointMetrics;
   }
 
+  // Add getter with lazy initialization check
+  public OMSnapshotDirectoryMetrics getSnapshotDirectoryMetrics() {
+    if (snapshotDirectoryMetrics == null) {
+      throw new IllegalStateException(
+          "SnapshotDirectoryMetrics not initialized. Call 
startSnapshotDirectoryMetrics() first.");
+    }
+    return snapshotDirectoryMetrics;
+  }

Review Comment:
   The `getSnapshotDirectoryMetrics()` method throws an `IllegalStateException` 
if metrics are not initialized, but this exception is not documented in 
JavaDoc. Consider adding a `@throws` tag to document this behavior for API 
consumers.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Metrics for tracking db.snapshots directory space usage and SST file counts.
+ * Provides both aggregate metrics and per-checkpoint-directory metrics.
+ * Metrics are updated asynchronously to avoid blocking operations.
+ */
[email protected]
+@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE)
+public final class OMSnapshotDirectoryMetrics implements MetricsSource {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class);
+  private static final String SOURCE_NAME =
+      OMSnapshotDirectoryMetrics.class.getSimpleName();
+
+  // Aggregate metrics
+  private @Metric MutableGaugeLong dbSnapshotsDirSize;
+  private @Metric MutableGaugeLong totalSstFilesCount;
+  private @Metric MutableGaugeLong numSnapshots;
+
+  private final AtomicLong lastUpdateTime = new AtomicLong(0);
+  // Change the field declaration:
+  private final AtomicReference<CompletableFuture<Void>> 
currentUpdateFutureRef =
+      new AtomicReference<>();
+  private final OMMetadataManager metadataManager;
+  private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME);
+
+  // Per-checkpoint-directory metrics storage
+  private volatile Map<String, CheckpointMetrics> checkpointMetricsMap = new 
HashMap<>();
+
+  private Timer updateTimer;
+
+  // Add start method (after unRegister method)
+  /**
+   * Starts the periodic metrics update task.
+   *
+   * @param conf OzoneConfiguration for reading update interval
+   */
+  public void start(OzoneConfiguration conf) {
+    long updateInterval = 
conf.getTimeDuration(OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL,
+        OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    updateTimer = new Timer("OMSnapshotDirectoryMetricsUpdate", true);
+    updateTimer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        updateMetricsAsync();
+      }
+    }, 0, updateInterval);
+
+    // Do initial update
+    updateMetricsAsync();
+  }
+
+  /**
+   * Stops the periodic metrics update task.
+   */
+  public void stop() {
+    if (updateTimer != null) {
+      updateTimer.cancel();
+      updateTimer = null;
+    }
+  }
+
+  public void unRegister() {
+    stop();
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.unregisterSource(SOURCE_NAME);
+  }
+
+  /**
+   * Internal class to store per-checkpoint metrics.
+   */
+  private static class CheckpointMetrics {
+    private final long size;
+    private final int sstFileCount;
+
+    CheckpointMetrics(long size, int sstFileCount) {
+      this.size = size;
+      this.sstFileCount = sstFileCount;
+    }
+
+    public long getSize() {
+      return size;
+    }
+
+    public int getSstFileCount() {
+      return sstFileCount;
+    }
+  }
+
+  private OMSnapshotDirectoryMetrics(OMMetadataManager metadataManager) {
+    this.metadataManager = metadataManager;
+  }
+
+  public static OMSnapshotDirectoryMetrics create(String parent,
+      OMMetadataManager metadataManager) {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register(SOURCE_NAME,
+        parent,
+        new OMSnapshotDirectoryMetrics(metadataManager));
+  }
+
+  /**
+   * Updates all metrics (aggregate and per-checkpoint) asynchronously
+   * in a background thread.
+   */
+  public void updateMetricsAsync() {
+    CompletableFuture<Void> currentUpdateFuture = currentUpdateFutureRef.get();
+    if (currentUpdateFuture != null && !currentUpdateFuture.isDone()) {
+      return;
+    }
+
+    CompletableFuture<Void> newFuture = CompletableFuture.runAsync(() -> {
+      try {
+        updateMetrics();
+        lastUpdateTime.set(System.currentTimeMillis());
+      } catch (Exception e) {
+        LOG.warn("Failed to update snapshot directory metrics", e);
+      } finally {
+        currentUpdateFutureRef.set(null);
+      }
+    });
+
+    currentUpdateFutureRef.set(newFuture);
+  }
+
+  /**
+   * Updates all metrics synchronously - both aggregate and 
per-checkpoint-directory.
+   */
+  @VisibleForTesting
+  void updateMetrics() throws IOException {
+    DBStore store = metadataManager.getStore();
+    if (!(store instanceof RDBStore)) {
+      LOG.debug("Store is not RDBStore, skipping snapshot directory metrics 
update");
+      resetMetrics();
+      return;
+    }
+
+    RDBStore rdbStore = (RDBStore) store;
+    String snapshotsParentDir = rdbStore.getSnapshotsParentDir();
+
+    if (snapshotsParentDir == null) {
+      resetMetrics();
+      return;
+    }
+
+    File snapshotsDir = new File(snapshotsParentDir);
+    if (!snapshotsDir.exists() || !snapshotsDir.isDirectory()) {
+      resetMetrics();
+      return;
+    }
+
+    try {
+      // Calculate aggregate metrics
+      long totalSize = FileUtils.sizeOfDirectory(snapshotsDir);
+      dbSnapshotsDirSize.set(totalSize);
+
+      // Calculate per-checkpoint-directory metrics and aggregate totals
+      File[] checkpointDirs = snapshotsDir.listFiles(File::isDirectory);
+      int totalSstCount = 0;
+      int snapshotCount = 0;
+      Map<String, CheckpointMetrics> newCheckpointMetricsMap = new HashMap<>();
+
+      if (checkpointDirs != null) {
+        snapshotCount = checkpointDirs.length;
+
+        for (File checkpointDir : checkpointDirs) {
+          if (!checkpointDir.isDirectory()) {
+            continue;
+          }
+
+          String checkpointDirName = checkpointDir.getName();
+          long checkpointSize = 0;
+          int sstFileCount = 0;
+
+          try {
+            checkpointSize = FileUtils.sizeOfDirectory(checkpointDir);
+            File[] sstFiles = checkpointDir.listFiles((dir, name) ->
+                name.toLowerCase().endsWith(ROCKSDB_SST_SUFFIX));
+            if (sstFiles != null) {
+              sstFileCount = sstFiles.length;
+            }
+          } catch (Exception e) {
+            LOG.debug("Error calculating metrics for checkpoint directory {}",
+                checkpointDirName, e);
+            // Continue with other directories even if one fails
+            continue;
+          }
+
+          totalSstCount += sstFileCount;
+          newCheckpointMetricsMap.put(checkpointDirName,
+              new CheckpointMetrics(checkpointSize, sstFileCount));
+        }
+      }
+
+      // Update aggregate metrics
+      totalSstFilesCount.set(totalSstCount);
+      numSnapshots.set(snapshotCount);
+
+      // Atomically update per-checkpoint metrics map
+      checkpointMetricsMap = newCheckpointMetricsMap;
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Updated snapshot directory metrics: size={}, sstFiles={}, 
snapshots={}",
+            totalSize, totalSstCount, snapshotCount);
+      }
+
+    } catch (Exception e) {
+      LOG.warn("Error calculating snapshot directory metrics", e);
+      resetMetrics();
+    }
+  }
+
+  /**
+   * Resets all metrics to zero.
+   */
+  private void resetMetrics() {
+    dbSnapshotsDirSize.set(0);
+    totalSstFilesCount.set(0);
+    numSnapshots.set(0);
+    checkpointMetricsMap = new HashMap<>();
+  }
+
+  /**
+   * Implements MetricsSource to provide metrics.
+   * Reads from cached values updated by updateMetrics().
+   */
+  @Override
+  public void getMetrics(MetricsCollector collector, boolean all) {
+    // Add aggregate metrics
+    collector.addRecord(SOURCE_NAME)
+        .setContext("Snapshot Directory Metrics")
+        .addGauge(SnapshotMetricsInfo.DbSnapshotsDirSize, 
dbSnapshotsDirSize.value())
+        .addGauge(SnapshotMetricsInfo.TotalSstFilesCount, 
totalSstFilesCount.value())
+        .addGauge(SnapshotMetricsInfo.NumSnapshots, numSnapshots.value());
+
+    // Add per-checkpoint-directory metrics from cached map
+    Map<String, CheckpointMetrics> currentMetrics = checkpointMetricsMap;
+    for (Map.Entry<String, CheckpointMetrics> entry : 
currentMetrics.entrySet()) {
+      String checkpointDirName = entry.getKey();
+      CheckpointMetrics metrics = entry.getValue();
+
+      collector.addRecord(SOURCE_NAME)
+          .setContext("Per-Checkpoint Directory Metrics")
+          .tag(SnapshotMetricsInfo.CheckpointDirName, checkpointDirName)
+          .addGauge(SnapshotMetricsInfo.CheckpointDirSize, metrics.getSize())
+          .addGauge(SnapshotMetricsInfo.CheckpointSstFilesCount, 
metrics.getSstFileCount());
+    }
+  }
+
+  @VisibleForTesting
+  public long getDbSnapshotsDirSize() {
+    return dbSnapshotsDirSize.value();
+  }
+
+  @VisibleForTesting
+  public long getTotalSstFilesCount() {
+    return totalSstFilesCount.value();
+  }
+
+  @VisibleForTesting
+  public long getNumSnapshots() {
+    return numSnapshots.value();
+  }
+
+  @VisibleForTesting
+  public long getLastUpdateTime() {
+    return lastUpdateTime.get();
+  }
+
+  @VisibleForTesting
+  public Map<String, CheckpointMetrics> getCheckpointMetricsMap() {
+    return Collections.unmodifiableMap(new HashMap<>(checkpointMetricsMap));
+  }
+
+  /**
+   * Metrics info enum for snapshot directory metrics.
+   */
+  enum SnapshotMetricsInfo implements MetricsInfo {
+    // Aggregate metrics
+    DbSnapshotsDirSize("Total size of db.snapshots directory in bytes"),
+    TotalSstFilesCount("Total number of SST files across all snapshots"),
+    NumSnapshots("Total number of snapshot checkpoint directories"),
+
+    // Per-checkpoint-directory metric tag
+    CheckpointDirName("Checkpoint directory name"),
+
+    // Per-checkpoint-directory metrics
+    CheckpointDirSize("Size of checkpoint directory in bytes"),
+    CheckpointSstFilesCount("Number of SST files in checkpoint directory");
+
+    private final String desc;
+
+    SnapshotMetricsInfo(String desc) {
+      this.desc = desc;
+    }
+
+    @Override
+    public String description() {
+      return desc;
+    }
+  }
+}

Review Comment:
   The new `OMSnapshotDirectoryMetrics` class lacks test coverage. Given that 
there are comprehensive tests for other snapshot-related classes in 
`hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/`,
 this new metrics class should have unit tests covering:
   - Async metrics update behavior
   - Directory size and SST file counting logic
   - Error handling when directories don't exist
   - Timer lifecycle (start/stop)
   - Concurrent update prevention
   - Per-checkpoint metrics collection
   
   Consider adding a `TestOMSnapshotDirectoryMetrics` test class.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Metrics for tracking db.snapshots directory space usage and SST file counts.
+ * Provides both aggregate metrics and per-checkpoint-directory metrics.
+ * Metrics are updated asynchronously to avoid blocking operations.
+ */
[email protected]
+@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE)
+public final class OMSnapshotDirectoryMetrics implements MetricsSource {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class);
+  private static final String SOURCE_NAME =
+      OMSnapshotDirectoryMetrics.class.getSimpleName();
+
+  // Aggregate metrics
+  private @Metric MutableGaugeLong dbSnapshotsDirSize;
+  private @Metric MutableGaugeLong totalSstFilesCount;
+  private @Metric MutableGaugeLong numSnapshots;
+
+  private final AtomicLong lastUpdateTime = new AtomicLong(0);
+  // Change the field declaration:
+  private final AtomicReference<CompletableFuture<Void>> 
currentUpdateFutureRef =
+      new AtomicReference<>();
+  private final OMMetadataManager metadataManager;
+  private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME);
+
+  // Per-checkpoint-directory metrics storage
+  private volatile Map<String, CheckpointMetrics> checkpointMetricsMap = new 
HashMap<>();
+
+  private Timer updateTimer;
+
+  // Add start method (after unRegister method)
+  /**
+   * Starts the periodic metrics update task.
+   *
+   * @param conf OzoneConfiguration for reading update interval
+   */
+  public void start(OzoneConfiguration conf) {
+    long updateInterval = 
conf.getTimeDuration(OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL,
+        OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    updateTimer = new Timer("OMSnapshotDirectoryMetricsUpdate", true);
+    updateTimer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        updateMetricsAsync();
+      }
+    }, 0, updateInterval);
+
+    // Do initial update
+    updateMetricsAsync();
+  }
+
+  /**
+   * Stops the periodic metrics update task.
+   */
+  public void stop() {
+    if (updateTimer != null) {
+      updateTimer.cancel();
+      updateTimer = null;
+    }
+  }
+
+  public void unRegister() {
+    stop();
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.unregisterSource(SOURCE_NAME);
+  }
+
+  /**
+   * Internal class to store per-checkpoint metrics.
+   */
+  private static class CheckpointMetrics {
+    private final long size;
+    private final int sstFileCount;
+
+    CheckpointMetrics(long size, int sstFileCount) {
+      this.size = size;
+      this.sstFileCount = sstFileCount;
+    }
+
+    public long getSize() {
+      return size;
+    }
+
+    public int getSstFileCount() {
+      return sstFileCount;
+    }
+  }
+
+  private OMSnapshotDirectoryMetrics(OMMetadataManager metadataManager) {
+    this.metadataManager = metadataManager;
+  }
+
+  public static OMSnapshotDirectoryMetrics create(String parent,
+      OMMetadataManager metadataManager) {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register(SOURCE_NAME,
+        parent,
+        new OMSnapshotDirectoryMetrics(metadataManager));
+  }
+
+  /**
+   * Updates all metrics (aggregate and per-checkpoint) asynchronously
+   * in a background thread.
+   */
+  public void updateMetricsAsync() {
+    CompletableFuture<Void> currentUpdateFuture = currentUpdateFutureRef.get();
+    if (currentUpdateFuture != null && !currentUpdateFuture.isDone()) {
+      return;
+    }
+
+    CompletableFuture<Void> newFuture = CompletableFuture.runAsync(() -> {
+      try {
+        updateMetrics();
+        lastUpdateTime.set(System.currentTimeMillis());
+      } catch (Exception e) {
+        LOG.warn("Failed to update snapshot directory metrics", e);
+      } finally {
+        currentUpdateFutureRef.set(null);
+      }
+    });
+
+    currentUpdateFutureRef.set(newFuture);

Review Comment:
   The `currentUpdateFutureRef` is set to `null` in the `finally` block (line 
174) before checking if the new future should be set. This creates a race 
condition where:
   1. Thread A completes and sets ref to null (line 174)
   2. Thread B calls `updateMetricsAsync()`, sees null, and sets a new future 
(line 178)
   3. Thread A continues and overwrites with its future (line 178)
   
   This could cause concurrent update executions. Consider setting the future 
reference before starting the async task, or use `compareAndSet` to atomically 
update the reference.
   ```suggestion
         }
           // No need to set currentUpdateFutureRef to null here.
         }
       });
   
       // Atomically set the new future only if the previous is null or done.
       while (true) {
         CompletableFuture<Void> prev = currentUpdateFutureRef.get();
         if (prev == null || prev.isDone()) {
           if (currentUpdateFutureRef.compareAndSet(prev, newFuture)) {
             break;
           }
         } else {
           // Another update started, do not overwrite.
           break;
         }
       }
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Metrics for tracking db.snapshots directory space usage and SST file counts.
+ * Provides both aggregate metrics and per-checkpoint-directory metrics.
+ * Metrics are updated asynchronously to avoid blocking operations.
+ */
[email protected]
+@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE)
+public final class OMSnapshotDirectoryMetrics implements MetricsSource {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class);
+  private static final String SOURCE_NAME =
+      OMSnapshotDirectoryMetrics.class.getSimpleName();
+
+  // Aggregate metrics
+  private @Metric MutableGaugeLong dbSnapshotsDirSize;
+  private @Metric MutableGaugeLong totalSstFilesCount;
+  private @Metric MutableGaugeLong numSnapshots;
+
+  private final AtomicLong lastUpdateTime = new AtomicLong(0);
+  // Change the field declaration:
+  private final AtomicReference<CompletableFuture<Void>> 
currentUpdateFutureRef =
+      new AtomicReference<>();
+  private final OMMetadataManager metadataManager;
+  private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME);
+
+  // Per-checkpoint-directory metrics storage
+  private volatile Map<String, CheckpointMetrics> checkpointMetricsMap = new 
HashMap<>();
+
+  private Timer updateTimer;
+
+  // Add start method (after unRegister method)
+  /**
+   * Starts the periodic metrics update task.
+   *
+   * @param conf OzoneConfiguration for reading update interval
+   */
+  public void start(OzoneConfiguration conf) {
+    long updateInterval = 
conf.getTimeDuration(OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL,
+        OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    updateTimer = new Timer("OMSnapshotDirectoryMetricsUpdate", true);
+    updateTimer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        updateMetricsAsync();
+      }
+    }, 0, updateInterval);
+
+    // Do initial update
+    updateMetricsAsync();
+  }

Review Comment:
   Calling `start()` multiple times creates multiple Timer instances without 
canceling the previous one, causing a Timer leak. The old timer will continue 
running even after a new one is created. Consider checking if `updateTimer` is 
already non-null and either throwing an exception or canceling the old timer 
before creating a new one.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Metrics for tracking db.snapshots directory space usage and SST file counts.
+ * Provides both aggregate metrics and per-checkpoint-directory metrics.
+ * Metrics are updated asynchronously to avoid blocking operations.
+ */
[email protected]
+@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE)
+public final class OMSnapshotDirectoryMetrics implements MetricsSource {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class);
+  private static final String SOURCE_NAME =
+      OMSnapshotDirectoryMetrics.class.getSimpleName();
+
+  // Aggregate metrics
+  private @Metric MutableGaugeLong dbSnapshotsDirSize;
+  private @Metric MutableGaugeLong totalSstFilesCount;
+  private @Metric MutableGaugeLong numSnapshots;
+
+  private final AtomicLong lastUpdateTime = new AtomicLong(0);
+  // Change the field declaration:
+  private final AtomicReference<CompletableFuture<Void>> 
currentUpdateFutureRef =
+      new AtomicReference<>();
+  private final OMMetadataManager metadataManager;
+  private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME);
+
+  // Per-checkpoint-directory metrics storage
+  private volatile Map<String, CheckpointMetrics> checkpointMetricsMap = new 
HashMap<>();
+
+  private Timer updateTimer;
+
+  // Add start method (after unRegister method)
+  /**
+   * Starts the periodic metrics update task.
+   *
+   * @param conf OzoneConfiguration for reading update interval
+   */
+  public void start(OzoneConfiguration conf) {
+    long updateInterval = 
conf.getTimeDuration(OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL,
+        OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    updateTimer = new Timer("OMSnapshotDirectoryMetricsUpdate", true);
+    updateTimer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        updateMetricsAsync();
+      }
+    }, 0, updateInterval);
+
+    // Do initial update
+    updateMetricsAsync();
+  }
+
+  /**
+   * Stops the periodic metrics update task.
+   */
+  public void stop() {
+    if (updateTimer != null) {
+      updateTimer.cancel();
+      updateTimer = null;
+    }
+  }
+
+  public void unRegister() {
+    stop();
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.unregisterSource(SOURCE_NAME);
+  }
+
+  /**
+   * Internal class to store per-checkpoint metrics.
+   */
+  private static class CheckpointMetrics {
+    private final long size;
+    private final int sstFileCount;
+
+    CheckpointMetrics(long size, int sstFileCount) {
+      this.size = size;
+      this.sstFileCount = sstFileCount;
+    }
+
+    public long getSize() {
+      return size;
+    }
+
+    public int getSstFileCount() {
+      return sstFileCount;
+    }
+  }
+
+  private OMSnapshotDirectoryMetrics(OMMetadataManager metadataManager) {
+    this.metadataManager = metadataManager;
+  }
+
+  public static OMSnapshotDirectoryMetrics create(String parent,
+      OMMetadataManager metadataManager) {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register(SOURCE_NAME,
+        parent,
+        new OMSnapshotDirectoryMetrics(metadataManager));
+  }
+
+  /**
+   * Updates all metrics (aggregate and per-checkpoint) asynchronously
+   * in a background thread.
+   */
+  public void updateMetricsAsync() {
+    CompletableFuture<Void> currentUpdateFuture = currentUpdateFutureRef.get();
+    if (currentUpdateFuture != null && !currentUpdateFuture.isDone()) {
+      return;
+    }
+
+    CompletableFuture<Void> newFuture = CompletableFuture.runAsync(() -> {
+      try {
+        updateMetrics();
+        lastUpdateTime.set(System.currentTimeMillis());
+      } catch (Exception e) {
+        LOG.warn("Failed to update snapshot directory metrics", e);
+      } finally {
+        currentUpdateFutureRef.set(null);
+      }
+    });
+
+    currentUpdateFutureRef.set(newFuture);
+  }
+
+  /**
+   * Updates all metrics synchronously - both aggregate and 
per-checkpoint-directory.
+   */
+  @VisibleForTesting
+  void updateMetrics() throws IOException {
+    DBStore store = metadataManager.getStore();
+    if (!(store instanceof RDBStore)) {
+      LOG.debug("Store is not RDBStore, skipping snapshot directory metrics 
update");
+      resetMetrics();
+      return;
+    }
+
+    RDBStore rdbStore = (RDBStore) store;
+    String snapshotsParentDir = rdbStore.getSnapshotsParentDir();
+
+    if (snapshotsParentDir == null) {
+      resetMetrics();
+      return;
+    }
+
+    File snapshotsDir = new File(snapshotsParentDir);
+    if (!snapshotsDir.exists() || !snapshotsDir.isDirectory()) {
+      resetMetrics();
+      return;
+    }
+
+    try {
+      // Calculate aggregate metrics
+      long totalSize = FileUtils.sizeOfDirectory(snapshotsDir);
+      dbSnapshotsDirSize.set(totalSize);
+
+      // Calculate per-checkpoint-directory metrics and aggregate totals
+      File[] checkpointDirs = snapshotsDir.listFiles(File::isDirectory);
+      int totalSstCount = 0;
+      int snapshotCount = 0;
+      Map<String, CheckpointMetrics> newCheckpointMetricsMap = new HashMap<>();
+
+      if (checkpointDirs != null) {
+        snapshotCount = checkpointDirs.length;
+
+        for (File checkpointDir : checkpointDirs) {
+          if (!checkpointDir.isDirectory()) {
+            continue;
+          }
+
+          String checkpointDirName = checkpointDir.getName();
+          long checkpointSize = 0;
+          int sstFileCount = 0;
+
+          try {
+            checkpointSize = FileUtils.sizeOfDirectory(checkpointDir);
+            File[] sstFiles = checkpointDir.listFiles((dir, name) ->
+                name.toLowerCase().endsWith(ROCKSDB_SST_SUFFIX));
+            if (sstFiles != null) {
+              sstFileCount = sstFiles.length;
+            }
+          } catch (Exception e) {
+            LOG.debug("Error calculating metrics for checkpoint directory {}",
+                checkpointDirName, e);
+            // Continue with other directories even if one fails
+            continue;
+          }
+
+          totalSstCount += sstFileCount;
+          newCheckpointMetricsMap.put(checkpointDirName,
+              new CheckpointMetrics(checkpointSize, sstFileCount));
+        }
+      }
+
+      // Update aggregate metrics
+      totalSstFilesCount.set(totalSstCount);
+      numSnapshots.set(snapshotCount);
+
+      // Atomically update per-checkpoint metrics map
+      checkpointMetricsMap = newCheckpointMetricsMap;

Review Comment:
   The HashMap instantiation is not thread-safe when read by `getMetrics()`. 
While the volatile reference ensures visibility of the map reference itself, 
there's a small window where `checkpointMetricsMap` could be partially 
constructed. Consider using `Collections.unmodifiableMap()` to wrap the HashMap 
before assignment to ensure safe publication.
   ```suggestion
         checkpointMetricsMap = 
Collections.unmodifiableMap(newCheckpointMetricsMap);
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java:
##########
@@ -261,6 +264,37 @@ public DBCheckpointMetrics getDBCheckpointMetrics() {
     return dbCheckpointMetrics;
   }
 
+  // Add getter with lazy initialization check

Review Comment:
   The comment "Add getter with lazy initialization check" appears to be a 
development note and should be removed. The JavaDoc below already documents the 
method's behavior.
   ```suggestion
   
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Metrics for tracking db.snapshots directory space usage and SST file counts.
+ * Provides both aggregate metrics and per-checkpoint-directory metrics.
+ * Metrics are updated asynchronously to avoid blocking operations.
+ */
[email protected]
+@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE)
+public final class OMSnapshotDirectoryMetrics implements MetricsSource {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class);
+  private static final String SOURCE_NAME =
+      OMSnapshotDirectoryMetrics.class.getSimpleName();
+
+  // Aggregate metrics
+  private @Metric MutableGaugeLong dbSnapshotsDirSize;
+  private @Metric MutableGaugeLong totalSstFilesCount;
+  private @Metric MutableGaugeLong numSnapshots;
+
+  private final AtomicLong lastUpdateTime = new AtomicLong(0);
+  // Change the field declaration:
+  private final AtomicReference<CompletableFuture<Void>> 
currentUpdateFutureRef =
+      new AtomicReference<>();
+  private final OMMetadataManager metadataManager;
+  private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME);
+
+  // Per-checkpoint-directory metrics storage
+  private volatile Map<String, CheckpointMetrics> checkpointMetricsMap = new 
HashMap<>();
+
+  private Timer updateTimer;
+
+  // Add start method (after unRegister method)

Review Comment:
   The comment "Add start method (after unRegister method)" appears to be a 
note for positioning the method during development. It should be removed as it 
doesn't provide meaningful documentation about the method's purpose.
   ```suggestion
   
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Metrics for tracking db.snapshots directory space usage and SST file counts.
+ * Provides both aggregate metrics and per-checkpoint-directory metrics.
+ * Metrics are updated asynchronously to avoid blocking operations.
+ */
[email protected]
+@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE)
+public final class OMSnapshotDirectoryMetrics implements MetricsSource {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class);
+  private static final String SOURCE_NAME =
+      OMSnapshotDirectoryMetrics.class.getSimpleName();
+
+  // Aggregate metrics
+  private @Metric MutableGaugeLong dbSnapshotsDirSize;
+  private @Metric MutableGaugeLong totalSstFilesCount;
+  private @Metric MutableGaugeLong numSnapshots;
+
+  private final AtomicLong lastUpdateTime = new AtomicLong(0);
+  // Change the field declaration:

Review Comment:
   The comment "Change the field declaration:" appears to be leftover from 
development. It should be removed as it doesn't add value to the code 
documentation.
   ```suggestion
   
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java:
##########
@@ -1950,6 +1954,9 @@ public void restart() throws IOException {
     metricsTimer = new Timer();
     metricsTimer.schedule(scheduleOMMetricsWriteTask, 0, period);
 
+    // Start snapshot directory metrics updates

Review Comment:
   The `startSnapshotDirectoryMetrics()` method is called in both `start()` 
(line 1873) and `restart()` (line 1958). If `restart()` is called after 
`start()`, the metrics instance will call `start()` again (line 288 in 
OMMetrics), which will create a new Timer without canceling the old one (see 
related issue in OMSnapshotDirectoryMetrics). This could lead to multiple 
concurrent timer threads updating the same metrics. Consider checking if the 
metrics are already started before calling `start()` again, or ensuring proper 
cleanup before restart.
   ```suggestion
       // Start snapshot directory metrics updates
       metrics.stopSnapshotDirectoryMetrics();
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Metrics for tracking db.snapshots directory space usage and SST file counts.
+ * Provides both aggregate metrics and per-checkpoint-directory metrics.
+ * Metrics are updated asynchronously to avoid blocking operations.
+ */
[email protected]
+@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE)
+public final class OMSnapshotDirectoryMetrics implements MetricsSource {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class);
+  private static final String SOURCE_NAME =
+      OMSnapshotDirectoryMetrics.class.getSimpleName();
+
+  // Aggregate metrics
+  private @Metric MutableGaugeLong dbSnapshotsDirSize;
+  private @Metric MutableGaugeLong totalSstFilesCount;
+  private @Metric MutableGaugeLong numSnapshots;
+
+  private final AtomicLong lastUpdateTime = new AtomicLong(0);
+  // Change the field declaration:
+  private final AtomicReference<CompletableFuture<Void>> 
currentUpdateFutureRef =
+      new AtomicReference<>();
+  private final OMMetadataManager metadataManager;
+  private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME);
+
+  // Per-checkpoint-directory metrics storage
+  private volatile Map<String, CheckpointMetrics> checkpointMetricsMap = new 
HashMap<>();
+
+  private Timer updateTimer;
+
+  // Add start method (after unRegister method)
+  /**
+   * Starts the periodic metrics update task.
+   *
+   * @param conf OzoneConfiguration for reading update interval
+   */
+  public void start(OzoneConfiguration conf) {
+    long updateInterval = 
conf.getTimeDuration(OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL,
+        OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    updateTimer = new Timer("OMSnapshotDirectoryMetricsUpdate", true);
+    updateTimer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        updateMetricsAsync();
+      }
+    }, 0, updateInterval);
+
+    // Do initial update
+    updateMetricsAsync();
+  }
+
+  /**
+   * Stops the periodic metrics update task.
+   */
+  public void stop() {
+    if (updateTimer != null) {
+      updateTimer.cancel();
+      updateTimer = null;
+    }
+  }
+
+  public void unRegister() {
+    stop();
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.unregisterSource(SOURCE_NAME);
+  }
+
+  /**
+   * Internal class to store per-checkpoint metrics.
+   */
+  private static class CheckpointMetrics {
+    private final long size;
+    private final int sstFileCount;
+
+    CheckpointMetrics(long size, int sstFileCount) {
+      this.size = size;
+      this.sstFileCount = sstFileCount;
+    }
+
+    public long getSize() {
+      return size;
+    }
+
+    public int getSstFileCount() {
+      return sstFileCount;
+    }
+  }
+
+  private OMSnapshotDirectoryMetrics(OMMetadataManager metadataManager) {
+    this.metadataManager = metadataManager;
+  }
+
+  public static OMSnapshotDirectoryMetrics create(String parent,
+      OMMetadataManager metadataManager) {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register(SOURCE_NAME,
+        parent,
+        new OMSnapshotDirectoryMetrics(metadataManager));
+  }
+
+  /**
+   * Updates all metrics (aggregate and per-checkpoint) asynchronously
+   * in a background thread.
+   */
+  public void updateMetricsAsync() {
+    CompletableFuture<Void> currentUpdateFuture = currentUpdateFutureRef.get();
+    if (currentUpdateFuture != null && !currentUpdateFuture.isDone()) {
+      return;
+    }
+
+    CompletableFuture<Void> newFuture = CompletableFuture.runAsync(() -> {
+      try {
+        updateMetrics();
+        lastUpdateTime.set(System.currentTimeMillis());
+      } catch (Exception e) {
+        LOG.warn("Failed to update snapshot directory metrics", e);
+      } finally {
+        currentUpdateFutureRef.set(null);
+      }
+    });
+
+    currentUpdateFutureRef.set(newFuture);
+  }
+
+  /**
+   * Updates all metrics synchronously - both aggregate and 
per-checkpoint-directory.
+   */
+  @VisibleForTesting
+  void updateMetrics() throws IOException {
+    DBStore store = metadataManager.getStore();
+    if (!(store instanceof RDBStore)) {
+      LOG.debug("Store is not RDBStore, skipping snapshot directory metrics 
update");
+      resetMetrics();
+      return;
+    }
+
+    RDBStore rdbStore = (RDBStore) store;
+    String snapshotsParentDir = rdbStore.getSnapshotsParentDir();
+
+    if (snapshotsParentDir == null) {
+      resetMetrics();
+      return;
+    }
+
+    File snapshotsDir = new File(snapshotsParentDir);
+    if (!snapshotsDir.exists() || !snapshotsDir.isDirectory()) {
+      resetMetrics();
+      return;
+    }
+
+    try {
+      // Calculate aggregate metrics
+      long totalSize = FileUtils.sizeOfDirectory(snapshotsDir);
+      dbSnapshotsDirSize.set(totalSize);
+
+      // Calculate per-checkpoint-directory metrics and aggregate totals
+      File[] checkpointDirs = snapshotsDir.listFiles(File::isDirectory);
+      int totalSstCount = 0;
+      int snapshotCount = 0;
+      Map<String, CheckpointMetrics> newCheckpointMetricsMap = new HashMap<>();
+
+      if (checkpointDirs != null) {
+        snapshotCount = checkpointDirs.length;
+
+        for (File checkpointDir : checkpointDirs) {
+          if (!checkpointDir.isDirectory()) {
+            continue;
+          }

Review Comment:
   [nitpick] The `isDirectory()` check at line 222 is redundant because 
`listFiles(File::isDirectory)` at line 213 already filters for directories 
only. The check can be safely removed to simplify the code.
   ```suggestion
   
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Metrics for tracking db.snapshots directory space usage and SST file counts.
+ * Provides both aggregate metrics and per-checkpoint-directory metrics.
+ * Metrics are updated asynchronously to avoid blocking operations.
+ */
[email protected]
+@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE)
+public final class OMSnapshotDirectoryMetrics implements MetricsSource {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class);
+  private static final String SOURCE_NAME =
+      OMSnapshotDirectoryMetrics.class.getSimpleName();
+
+  // Aggregate metrics
+  private @Metric MutableGaugeLong dbSnapshotsDirSize;
+  private @Metric MutableGaugeLong totalSstFilesCount;
+  private @Metric MutableGaugeLong numSnapshots;
+
+  private final AtomicLong lastUpdateTime = new AtomicLong(0);
+  // Change the field declaration:
+  private final AtomicReference<CompletableFuture<Void>> 
currentUpdateFutureRef =
+      new AtomicReference<>();
+  private final OMMetadataManager metadataManager;
+  private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME);
+
+  // Per-checkpoint-directory metrics storage
+  private volatile Map<String, CheckpointMetrics> checkpointMetricsMap = new 
HashMap<>();
+
+  private Timer updateTimer;
+
+  // Add start method (after unRegister method)
+  /**
+   * Starts the periodic metrics update task.
+   *
+   * @param conf OzoneConfiguration for reading update interval
+   */
+  public void start(OzoneConfiguration conf) {
+    long updateInterval = 
conf.getTimeDuration(OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL,
+        OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    updateTimer = new Timer("OMSnapshotDirectoryMetricsUpdate", true);
+    updateTimer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        updateMetricsAsync();
+      }
+    }, 0, updateInterval);
+
+    // Do initial update
+    updateMetricsAsync();
+  }
+
+  /**
+   * Stops the periodic metrics update task.
+   */
+  public void stop() {
+    if (updateTimer != null) {
+      updateTimer.cancel();
+      updateTimer = null;
+    }
+  }
+
+  public void unRegister() {
+    stop();
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.unregisterSource(SOURCE_NAME);
+  }
+
+  /**
+   * Internal class to store per-checkpoint metrics.
+   */
+  private static class CheckpointMetrics {
+    private final long size;
+    private final int sstFileCount;
+
+    CheckpointMetrics(long size, int sstFileCount) {
+      this.size = size;
+      this.sstFileCount = sstFileCount;
+    }
+
+    public long getSize() {
+      return size;
+    }
+
+    public int getSstFileCount() {
+      return sstFileCount;
+    }
+  }
+
+  private OMSnapshotDirectoryMetrics(OMMetadataManager metadataManager) {
+    this.metadataManager = metadataManager;
+  }
+
+  public static OMSnapshotDirectoryMetrics create(String parent,
+      OMMetadataManager metadataManager) {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register(SOURCE_NAME,
+        parent,
+        new OMSnapshotDirectoryMetrics(metadataManager));
+  }
+
+  /**
+   * Updates all metrics (aggregate and per-checkpoint) asynchronously
+   * in a background thread.
+   */
+  public void updateMetricsAsync() {
+    CompletableFuture<Void> currentUpdateFuture = currentUpdateFutureRef.get();
+    if (currentUpdateFuture != null && !currentUpdateFuture.isDone()) {
+      return;
+    }
+
+    CompletableFuture<Void> newFuture = CompletableFuture.runAsync(() -> {
+      try {
+        updateMetrics();
+        lastUpdateTime.set(System.currentTimeMillis());
+      } catch (Exception e) {
+        LOG.warn("Failed to update snapshot directory metrics", e);
+      } finally {
+        currentUpdateFutureRef.set(null);
+      }
+    });
+
+    currentUpdateFutureRef.set(newFuture);
+  }
+
+  /**
+   * Updates all metrics synchronously - both aggregate and 
per-checkpoint-directory.
+   */
+  @VisibleForTesting
+  void updateMetrics() throws IOException {
+    DBStore store = metadataManager.getStore();
+    if (!(store instanceof RDBStore)) {
+      LOG.debug("Store is not RDBStore, skipping snapshot directory metrics 
update");
+      resetMetrics();
+      return;
+    }
+
+    RDBStore rdbStore = (RDBStore) store;
+    String snapshotsParentDir = rdbStore.getSnapshotsParentDir();
+
+    if (snapshotsParentDir == null) {
+      resetMetrics();
+      return;
+    }
+
+    File snapshotsDir = new File(snapshotsParentDir);
+    if (!snapshotsDir.exists() || !snapshotsDir.isDirectory()) {
+      resetMetrics();
+      return;
+    }
+
+    try {
+      // Calculate aggregate metrics
+      long totalSize = FileUtils.sizeOfDirectory(snapshotsDir);
+      dbSnapshotsDirSize.set(totalSize);
+
+      // Calculate per-checkpoint-directory metrics and aggregate totals
+      File[] checkpointDirs = snapshotsDir.listFiles(File::isDirectory);
+      int totalSstCount = 0;
+      int snapshotCount = 0;
+      Map<String, CheckpointMetrics> newCheckpointMetricsMap = new HashMap<>();
+
+      if (checkpointDirs != null) {
+        snapshotCount = checkpointDirs.length;
+
+        for (File checkpointDir : checkpointDirs) {
+          if (!checkpointDir.isDirectory()) {
+            continue;
+          }
+
+          String checkpointDirName = checkpointDir.getName();
+          long checkpointSize = 0;
+          int sstFileCount = 0;
+
+          try {
+            checkpointSize = FileUtils.sizeOfDirectory(checkpointDir);
+            File[] sstFiles = checkpointDir.listFiles((dir, name) ->
+                name.toLowerCase().endsWith(ROCKSDB_SST_SUFFIX));
+            if (sstFiles != null) {
+              sstFileCount = sstFiles.length;
+            }
+          } catch (Exception e) {
+            LOG.debug("Error calculating metrics for checkpoint directory {}",
+                checkpointDirName, e);
+            // Continue with other directories even if one fails
+            continue;
+          }
+
+          totalSstCount += sstFileCount;
+          newCheckpointMetricsMap.put(checkpointDirName,
+              new CheckpointMetrics(checkpointSize, sstFileCount));
+        }
+      }
+
+      // Update aggregate metrics
+      totalSstFilesCount.set(totalSstCount);
+      numSnapshots.set(snapshotCount);
+
+      // Atomically update per-checkpoint metrics map
+      checkpointMetricsMap = newCheckpointMetricsMap;
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Updated snapshot directory metrics: size={}, sstFiles={}, 
snapshots={}",
+            totalSize, totalSstCount, snapshotCount);
+      }
+
+    } catch (Exception e) {
+      LOG.warn("Error calculating snapshot directory metrics", e);
+      resetMetrics();
+    }
+  }
+
+  /**
+   * Resets all metrics to zero.
+   */
+  private void resetMetrics() {
+    dbSnapshotsDirSize.set(0);
+    totalSstFilesCount.set(0);
+    numSnapshots.set(0);
+    checkpointMetricsMap = new HashMap<>();
+  }
+
+  /**
+   * Implements MetricsSource to provide metrics.
+   * Reads from cached values updated by updateMetrics().
+   */
+  @Override
+  public void getMetrics(MetricsCollector collector, boolean all) {
+    // Add aggregate metrics
+    collector.addRecord(SOURCE_NAME)
+        .setContext("Snapshot Directory Metrics")
+        .addGauge(SnapshotMetricsInfo.DbSnapshotsDirSize, 
dbSnapshotsDirSize.value())
+        .addGauge(SnapshotMetricsInfo.TotalSstFilesCount, 
totalSstFilesCount.value())
+        .addGauge(SnapshotMetricsInfo.NumSnapshots, numSnapshots.value());
+
+    // Add per-checkpoint-directory metrics from cached map
+    Map<String, CheckpointMetrics> currentMetrics = checkpointMetricsMap;
+    for (Map.Entry<String, CheckpointMetrics> entry : 
currentMetrics.entrySet()) {
+      String checkpointDirName = entry.getKey();
+      CheckpointMetrics metrics = entry.getValue();
+
+      collector.addRecord(SOURCE_NAME)
+          .setContext("Per-Checkpoint Directory Metrics")
+          .tag(SnapshotMetricsInfo.CheckpointDirName, checkpointDirName)
+          .addGauge(SnapshotMetricsInfo.CheckpointDirSize, metrics.getSize())
+          .addGauge(SnapshotMetricsInfo.CheckpointSstFilesCount, 
metrics.getSstFileCount());
+    }
+  }
+
+  @VisibleForTesting
+  public long getDbSnapshotsDirSize() {
+    return dbSnapshotsDirSize.value();
+  }
+
+  @VisibleForTesting
+  public long getTotalSstFilesCount() {
+    return totalSstFilesCount.value();
+  }
+
+  @VisibleForTesting
+  public long getNumSnapshots() {
+    return numSnapshots.value();
+  }
+
+  @VisibleForTesting
+  public long getLastUpdateTime() {
+    return lastUpdateTime.get();
+  }
+
+  @VisibleForTesting
+  public Map<String, CheckpointMetrics> getCheckpointMetricsMap() {
+    return Collections.unmodifiableMap(new HashMap<>(checkpointMetricsMap));

Review Comment:
   [nitpick] Creating a new HashMap copy in `getCheckpointMetricsMap()` for 
every call (even for test purposes) is inefficient. Since the map is already 
volatile and the `CheckpointMetrics` objects are immutable, you could return 
`Collections.unmodifiableMap(checkpointMetricsMap)` directly without the extra 
HashMap copy. The volatile guarantee ensures visibility of the reference.
   ```suggestion
       return Collections.unmodifiableMap(checkpointMetricsMap);
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Metrics for tracking db.snapshots directory space usage and SST file counts.
+ * Provides both aggregate metrics and per-checkpoint-directory metrics.
+ * Metrics are updated asynchronously to avoid blocking operations.
+ */
[email protected]
+@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE)
+public final class OMSnapshotDirectoryMetrics implements MetricsSource {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class);
+  private static final String SOURCE_NAME =
+      OMSnapshotDirectoryMetrics.class.getSimpleName();
+
+  // Aggregate metrics
+  private @Metric MutableGaugeLong dbSnapshotsDirSize;
+  private @Metric MutableGaugeLong totalSstFilesCount;
+  private @Metric MutableGaugeLong numSnapshots;
+
+  private final AtomicLong lastUpdateTime = new AtomicLong(0);
+  // Change the field declaration:
+  private final AtomicReference<CompletableFuture<Void>> 
currentUpdateFutureRef =
+      new AtomicReference<>();
+  private final OMMetadataManager metadataManager;
+  private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME);

Review Comment:
   The `registry` field is declared but never used in the class. It should 
either be removed or utilized for metrics registration if that was the intended 
design pattern. Currently, metrics are registered through annotations (@Metric) 
and the custom `getMetrics()` implementation.
   ```suggestion
   
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Metrics for tracking db.snapshots directory space usage and SST file counts.
+ * Provides both aggregate metrics and per-checkpoint-directory metrics.
+ * Metrics are updated asynchronously to avoid blocking operations.
+ */
[email protected]
+@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE)
+public final class OMSnapshotDirectoryMetrics implements MetricsSource {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class);
+  private static final String SOURCE_NAME =
+      OMSnapshotDirectoryMetrics.class.getSimpleName();
+
+  // Aggregate metrics
+  private @Metric MutableGaugeLong dbSnapshotsDirSize;
+  private @Metric MutableGaugeLong totalSstFilesCount;
+  private @Metric MutableGaugeLong numSnapshots;
+
+  private final AtomicLong lastUpdateTime = new AtomicLong(0);
+  // Change the field declaration:
+  private final AtomicReference<CompletableFuture<Void>> 
currentUpdateFutureRef =
+      new AtomicReference<>();
+  private final OMMetadataManager metadataManager;
+  private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME);
+
+  // Per-checkpoint-directory metrics storage
+  private volatile Map<String, CheckpointMetrics> checkpointMetricsMap = new 
HashMap<>();
+
+  private Timer updateTimer;
+
+  // Add start method (after unRegister method)
+  /**
+   * Starts the periodic metrics update task.
+   *
+   * @param conf OzoneConfiguration for reading update interval
+   */
+  public void start(OzoneConfiguration conf) {
+    long updateInterval = 
conf.getTimeDuration(OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL,
+        OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    updateTimer = new Timer("OMSnapshotDirectoryMetricsUpdate", true);
+    updateTimer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        updateMetricsAsync();
+      }
+    }, 0, updateInterval);
+
+    // Do initial update
+    updateMetricsAsync();
+  }
+
+  /**
+   * Stops the periodic metrics update task.
+   */
+  public void stop() {
+    if (updateTimer != null) {
+      updateTimer.cancel();
+      updateTimer = null;
+    }
+  }
+
+  public void unRegister() {
+    stop();
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.unregisterSource(SOURCE_NAME);
+  }
+
+  /**
+   * Internal class to store per-checkpoint metrics.
+   */
+  private static class CheckpointMetrics {
+    private final long size;
+    private final int sstFileCount;
+
+    CheckpointMetrics(long size, int sstFileCount) {
+      this.size = size;
+      this.sstFileCount = sstFileCount;
+    }
+
+    public long getSize() {
+      return size;
+    }
+
+    public int getSstFileCount() {
+      return sstFileCount;
+    }
+  }
+
+  private OMSnapshotDirectoryMetrics(OMMetadataManager metadataManager) {
+    this.metadataManager = metadataManager;
+  }
+
+  public static OMSnapshotDirectoryMetrics create(String parent,
+      OMMetadataManager metadataManager) {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register(SOURCE_NAME,
+        parent,
+        new OMSnapshotDirectoryMetrics(metadataManager));
+  }
+
+  /**
+   * Updates all metrics (aggregate and per-checkpoint) asynchronously
+   * in a background thread.
+   */
+  public void updateMetricsAsync() {
+    CompletableFuture<Void> currentUpdateFuture = currentUpdateFutureRef.get();
+    if (currentUpdateFuture != null && !currentUpdateFuture.isDone()) {
+      return;
+    }
+
+    CompletableFuture<Void> newFuture = CompletableFuture.runAsync(() -> {
+      try {
+        updateMetrics();
+        lastUpdateTime.set(System.currentTimeMillis());
+      } catch (Exception e) {
+        LOG.warn("Failed to update snapshot directory metrics", e);
+      } finally {
+        currentUpdateFutureRef.set(null);
+      }
+    });
+
+    currentUpdateFutureRef.set(newFuture);
+  }
+
+  /**
+   * Updates all metrics synchronously - both aggregate and 
per-checkpoint-directory.
+   */
+  @VisibleForTesting
+  void updateMetrics() throws IOException {
+    DBStore store = metadataManager.getStore();
+    if (!(store instanceof RDBStore)) {
+      LOG.debug("Store is not RDBStore, skipping snapshot directory metrics 
update");
+      resetMetrics();
+      return;
+    }
+
+    RDBStore rdbStore = (RDBStore) store;
+    String snapshotsParentDir = rdbStore.getSnapshotsParentDir();
+
+    if (snapshotsParentDir == null) {
+      resetMetrics();
+      return;
+    }
+
+    File snapshotsDir = new File(snapshotsParentDir);
+    if (!snapshotsDir.exists() || !snapshotsDir.isDirectory()) {
+      resetMetrics();
+      return;
+    }
+
+    try {
+      // Calculate aggregate metrics
+      long totalSize = FileUtils.sizeOfDirectory(snapshotsDir);
+      dbSnapshotsDirSize.set(totalSize);
+
+      // Calculate per-checkpoint-directory metrics and aggregate totals
+      File[] checkpointDirs = snapshotsDir.listFiles(File::isDirectory);
+      int totalSstCount = 0;
+      int snapshotCount = 0;
+      Map<String, CheckpointMetrics> newCheckpointMetricsMap = new HashMap<>();
+
+      if (checkpointDirs != null) {
+        snapshotCount = checkpointDirs.length;
+
+        for (File checkpointDir : checkpointDirs) {
+          if (!checkpointDir.isDirectory()) {
+            continue;
+          }
+
+          String checkpointDirName = checkpointDir.getName();
+          long checkpointSize = 0;
+          int sstFileCount = 0;
+
+          try {
+            checkpointSize = FileUtils.sizeOfDirectory(checkpointDir);
+            File[] sstFiles = checkpointDir.listFiles((dir, name) ->
+                name.toLowerCase().endsWith(ROCKSDB_SST_SUFFIX));
+            if (sstFiles != null) {
+              sstFileCount = sstFiles.length;
+            }
+          } catch (Exception e) {
+            LOG.debug("Error calculating metrics for checkpoint directory {}",
+                checkpointDirName, e);
+            // Continue with other directories even if one fails
+            continue;
+          }
+
+          totalSstCount += sstFileCount;
+          newCheckpointMetricsMap.put(checkpointDirName,
+              new CheckpointMetrics(checkpointSize, sstFileCount));

Review Comment:
   [nitpick] The `FileUtils.sizeOfDirectory()` call can be expensive for large 
directory trees and may hold directory handles during traversal. Additionally, 
the `listFiles()` operations could potentially fail with unchecked exceptions 
in edge cases. While the code does catch generic exceptions at line 237 and 
continues, it would be more robust to add a timeout mechanism or limit the time 
spent on each directory to prevent blocking the metrics thread for too long.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to