Copilot commented on code in PR #9406:
URL: https://github.com/apache/ozone/pull/9406#discussion_r2632451752


##########
hadoop-hdds/common/src/main/resources/ozone-default.xml:
##########
@@ -2055,6 +2055,15 @@
       file. Unit could be defined with postfix (ns,ms,s,m,h,d)
     </description>
   </property>
+  <property>
+    <name>ozone.om.snapshot.directory.metrics.update.interval</name>
+    <value>5m</value>
+    <tag>OZONE, OM</tag>
+    <description>Time interval used to update the space consumption stats of 
the
+      Ozone Manager snapshot directories. Background thread periodically saves 
these

Review Comment:
   The description says "periodically saves these stats" but the metrics are 
not saved to disk - they are calculated and updated in memory for serving via 
the metrics system. Consider rephrasing to "Background thread periodically 
calculates and updates these stats."
   ```suggestion
         Ozone Manager snapshot directories. Background thread periodically 
calculates and updates these
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.ha.OMPeriodicMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Metrics for tracking db.snapshots directory space usage and SST file counts.
+ * Provides both aggregate metrics and per-checkpoint-directory metrics.
+ * Metrics are updated asynchronously to avoid blocking operations.
+ */
[email protected]
+@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE)
+public final class OMSnapshotDirectoryMetrics extends OMPeriodicMetrics 
implements MetricsSource {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class);
+  private static final String SOURCE_NAME =
+      OMSnapshotDirectoryMetrics.class.getSimpleName();
+
+  // Aggregate metrics
+  private @Metric MutableGaugeLong dbSnapshotsDirSize;
+  private @Metric MutableGaugeLong totalSstFilesCount;
+  private @Metric MutableGaugeLong numSnapshots;
+
+  private final OMMetadataManager metadataManager;
+  private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME);
+
+  OMSnapshotDirectoryMetrics(ConfigurationSource conf,
+      OMMetadataManager metadataManager) {
+    super("OMSnapshotDirectoryMetrics",
+        
conf.getTimeDuration(OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL,
+        OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT,
+            TimeUnit.MILLISECONDS));
+    this.metadataManager = metadataManager;
+  }
+
+  public static OMSnapshotDirectoryMetrics create(ConfigurationSource conf,
+      String parent, OMMetadataManager metadataManager) {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register(SOURCE_NAME, parent,
+        new OMSnapshotDirectoryMetrics(conf, metadataManager));
+  }
+
+  /**
+   * @return if the update was successful.
+   * Updates all metrics synchronously - both aggregate and 
per-checkpoint-directory.
+   */
+  @Override
+  protected boolean updateMetrics() {
+    DBStore store = metadataManager.getStore();
+    if (!(store instanceof RDBStore)) {
+      LOG.debug("Store is not RDBStore, skipping snapshot directory metrics 
update");
+      resetMetrics();
+      return false;
+    }
+
+    String snapshotsParentDir = store.getSnapshotsParentDir();
+
+    if (snapshotsParentDir == null) {
+      resetMetrics();
+      return false;
+    }
+
+    File snapshotsDir = new File(snapshotsParentDir);
+    if (!snapshotsDir.exists() || !snapshotsDir.isDirectory()) {
+      resetMetrics();
+      return false;
+    }
+
+    try {
+      // Calculate aggregate metrics
+      calculateAndUpdateMetrics(snapshotsDir);
+    } catch (Exception e) {
+      LOG.warn("Error calculating snapshot directory metrics", e);
+      resetMetrics();
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Calculates & updates directory size metrics accounting for hardlinks.
+   * (only counts each inode once).
+   * Uses Files.getAttribute to get the inode number and tracks visited inodes.
+   *
+   * @param directory the directory containing all checkpointDirs.
+   */
+  private void calculateAndUpdateMetrics(File directory) throws IOException {
+    Set<Object> visitedInodes = new HashSet<>();
+    long totalSize = 0;
+    long sstFileCount = 0;
+    int snapshotCount = 0;
+    try (Stream<Path> checkpointDirs = Files.list(directory.toPath())) {
+      for (Path checkpointDir : checkpointDirs.collect(Collectors.toList())) {
+        snapshotCount++;
+        try (Stream<Path> files = Files.list(checkpointDir)) {
+          for (Path path : files.collect(Collectors.toList())) {
+            if (Files.isRegularFile(path)) {
+              try {
+                // Get inode number
+                Object fileKey = IOUtils.getINode(path);
+                if (fileKey == null) {
+                  // Fallback: use file path + size as unique identifier
+                  fileKey = path.toAbsolutePath() + ":" + Files.size(path);
+                }
+                // Only count this file if we haven't seen this inode before
+                if (visitedInodes.add(fileKey)) {
+                  if (path.toFile().getName().endsWith(ROCKSDB_SST_SUFFIX)) {
+                    sstFileCount++;
+                  }
+                  totalSize += Files.size(path);
+                }
+              } catch (UnsupportedOperationException | IOException e) {
+                // Fallback: if we can't get inode, just count the file size.
+                LOG.error("Could not get inode for {}, using file size 
directly: {}",
+                    path, e.getMessage());
+                totalSize += Files.size(path);
+              }
+            }
+          }
+        }
+      }
+    }
+    numSnapshots.set(snapshotCount);
+    totalSstFilesCount.set(sstFileCount);
+    dbSnapshotsDirSize.set(totalSize);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Updated snapshot directory metrics: size={}, sstFiles={}, 
snapshots={}",
+          totalSize, sstFileCount, snapshotCount);
+    }
+  }
+
+  /**
+   * Resets all metrics to zero.
+   */
+  private void resetMetrics() {
+    dbSnapshotsDirSize.set(0);
+    totalSstFilesCount.set(0);
+    numSnapshots.set(0);
+  }
+
+  /**
+   * Implements MetricsSource to provide metrics.
+   * Reads from cached values updated by updateMetrics().
+   */
+  @Override
+  public void getMetrics(MetricsCollector collector, boolean all) {
+    // Add aggregate metrics
+    collector.addRecord(SOURCE_NAME)
+        .setContext("Snapshot Directory Metrics")
+        .addGauge(SnapshotMetricsInfo.DbSnapshotsDirSize, 
dbSnapshotsDirSize.value())
+        .addGauge(SnapshotMetricsInfo.TotalSstFilesCount, 
totalSstFilesCount.value())
+        .addGauge(SnapshotMetricsInfo.NumSnapshots, numSnapshots.value())
+        .addGauge(SnapshotMetricsInfo.LastUpdateTime, getLastUpdateTime());
+  }
+
+  @VisibleForTesting
+  public long getDbSnapshotsDirSize() {
+    return dbSnapshotsDirSize.value();
+  }
+
+  @VisibleForTesting
+  public long getTotalSstFilesCount() {
+    return totalSstFilesCount.value();
+  }
+
+  @VisibleForTesting
+  public long getNumSnapshots() {
+    return numSnapshots.value();
+  }
+
+  public void unRegister() {
+    stop();
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.unregisterSource(SOURCE_NAME);
+  }
+
+  /**
+   * Metrics info enum for snapshot directory metrics.
+   */
+  enum SnapshotMetricsInfo implements MetricsInfo {
+    // Aggregate metrics
+    DbSnapshotsDirSize("Total size of db.snapshots directory in bytes"),
+    TotalSstFilesCount("Total number of SST files across all snapshots"),
+    NumSnapshots("Total number of snapshot checkpoint directories"),
+    LastUpdateTime("Time stamp when the snapshot directory metrics were last 
updated");
+
+    private final String desc;
+
+    SnapshotMetricsInfo(String desc) {
+      this.desc = desc;
+    }
+
+    @Override
+    public String description() {
+      return desc;
+    }
+  }
+}

Review Comment:
   This new metrics class lacks test coverage. Key scenarios that should be 
tested include: handling of hardlinks (inode deduplication), SST file counting, 
snapshot directory counting, behavior when inode retrieval fails, and handling 
of missing/invalid snapshot directories. Consider adding unit tests similar to 
TestBucketUtilizationMetrics.java in the same test directory.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.ha.OMPeriodicMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Metrics for tracking db.snapshots directory space usage and SST file counts.
+ * Provides both aggregate metrics and per-checkpoint-directory metrics.

Review Comment:
   The class javadoc states "Provides both aggregate metrics and 
per-checkpoint-directory metrics" but the implementation only provides 
aggregate metrics (dbSnapshotsDirSize, totalSstFilesCount, numSnapshots). There 
are no per-checkpoint-directory metrics. Consider updating the documentation to 
accurately reflect the implementation.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.ha.OMPeriodicMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Metrics for tracking db.snapshots directory space usage and SST file counts.
+ * Provides both aggregate metrics and per-checkpoint-directory metrics.
+ * Metrics are updated asynchronously to avoid blocking operations.
+ */
[email protected]
+@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE)
+public final class OMSnapshotDirectoryMetrics extends OMPeriodicMetrics 
implements MetricsSource {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class);
+  private static final String SOURCE_NAME =
+      OMSnapshotDirectoryMetrics.class.getSimpleName();
+
+  // Aggregate metrics
+  private @Metric MutableGaugeLong dbSnapshotsDirSize;
+  private @Metric MutableGaugeLong totalSstFilesCount;
+  private @Metric MutableGaugeLong numSnapshots;
+
+  private final OMMetadataManager metadataManager;
+  private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME);
+
+  OMSnapshotDirectoryMetrics(ConfigurationSource conf,
+      OMMetadataManager metadataManager) {
+    super("OMSnapshotDirectoryMetrics",
+        
conf.getTimeDuration(OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL,
+        OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT,
+            TimeUnit.MILLISECONDS));
+    this.metadataManager = metadataManager;
+  }
+
+  public static OMSnapshotDirectoryMetrics create(ConfigurationSource conf,
+      String parent, OMMetadataManager metadataManager) {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register(SOURCE_NAME, parent,
+        new OMSnapshotDirectoryMetrics(conf, metadataManager));
+  }
+
+  /**
+   * @return if the update was successful.
+   * Updates all metrics synchronously - both aggregate and 
per-checkpoint-directory.

Review Comment:
   The comment "Updates all metrics synchronously - both aggregate and 
per-checkpoint-directory" is inaccurate. The implementation only updates 
aggregate metrics (dbSnapshotsDirSize, totalSstFilesCount, numSnapshots). There 
are no per-checkpoint-directory metrics being updated. Consider revising to 
"Updates aggregate metrics synchronously."
   ```suggestion
      * Updates aggregate metrics synchronously.
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMPeriodicMetrics.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.ha;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generic framework for metrics that need to get updated on a specified 
interval.
+ * A single threaded scheduled thread pool executor is created.
+ * The implementing class should only define the logic in updateMetrics()
+ */
+public abstract class OMPeriodicMetrics {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMPeriodicMetrics.class);
+  private final AtomicLong lastUpdateTime = new AtomicLong(0);
+  private ScheduledExecutorService updateExecutor;
+  private ScheduledFuture<?> updateTask;
+  private final String metricsTaskName;
+  private final long updateInterval;
+  private volatile boolean started = false;
+
+  protected OMPeriodicMetrics(String metricsTaskName, long updateInterval) {
+    if (metricsTaskName == null || metricsTaskName.isEmpty()) {
+      throw new IllegalArgumentException("metricsTaskName cannot be null or 
empty");
+    }
+    if (updateInterval <= 0) {
+      throw new IllegalArgumentException("updateInterval must be positive");
+    }
+    this.metricsTaskName = metricsTaskName;
+    this.updateInterval = updateInterval;
+  }
+
+  public void start() {
+    if (started) {
+      LOG.warn("Periodic metrics '{}' already started, ignoring duplicate 
start()",
+          metricsTaskName);
+      return;
+    }
+    updateExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+      Thread t = new Thread(r, metricsTaskName);
+      t.setDaemon(true);
+      return t;
+    });
+    // Schedule periodic updates
+    updateTask = updateExecutor.scheduleWithFixedDelay(() -> {
+      try {
+        boolean success = updateMetrics();
+        if (success) {
+          lastUpdateTime.set(System.currentTimeMillis());
+        }
+      } catch (Exception e) {
+        LOG.error("Failed to update metrics for periodic metrics", e);
+      }
+    }, 0, updateInterval, TimeUnit.MILLISECONDS);
+    started = true;
+  }
+
+  /**
+   * Updates the metrics periodically. This method is called by the framework
+   * at the configured interval after {@link #start()} is called.
+   * <p>
+   * Implementations should perform the actual metrics calculation and update
+   * logic here. The method should be thread-safe as it may be called from
+   * the scheduled executor thread.
+   *
+   * @return {@code true} if the metrics update was successful,
+   *         {@code false} if the update should be considered unsuccessful
+   *         (e.g., due to missing prerequisites or non-fatal errors).
+   *         When {@code false} is returned, {@link #getLastUpdateTime()}
+   *         will not be updated.
+   */
+  protected abstract boolean updateMetrics();
+
+  /**
+   * Stops the periodic metrics update task.
+   */
+  public void stop() {
+    if (!started) {
+      return;  // Already stopped or never started
+    }
+    if (updateTask != null) {
+      updateTask.cancel(false); // Don't interrupt if running
+      updateTask = null;
+    }
+
+    if (updateExecutor != null) {
+      updateExecutor.shutdown();
+      try {
+        // Wait for any running updateMetrics() to complete (with timeout)
+        if (!updateExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
+          LOG.warn("Metrics update executor did not terminate in time, forcing 
shutdown");
+          updateExecutor.shutdownNow();
+          // Wait a bit more for cancellation to take effect
+          if (!updateExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+            LOG.error("Metrics update executor did not terminate after force 
shutdown");
+          }
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        updateExecutor.shutdownNow();
+      }
+      updateExecutor = null;
+    }
+    started = false; // Reset
+  }
+
+  public long getLastUpdateTime() {
+    return lastUpdateTime.get();
+  }
+}

Review Comment:
   This new generic framework class lacks test coverage. Key scenarios that 
should be tested include: start/stop lifecycle, duplicate start calls, 
concurrent stop calls, exception handling during updateMetrics, thread 
interruption during shutdown, and lastUpdateTime tracking. Consider adding 
comprehensive unit tests for this foundational class.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.ha.OMPeriodicMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Metrics for tracking db.snapshots directory space usage and SST file counts.
+ * Provides both aggregate metrics and per-checkpoint-directory metrics.
+ * Metrics are updated asynchronously to avoid blocking operations.
+ */
[email protected]
+@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE)
+public final class OMSnapshotDirectoryMetrics extends OMPeriodicMetrics 
implements MetricsSource {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class);
+  private static final String SOURCE_NAME =
+      OMSnapshotDirectoryMetrics.class.getSimpleName();
+
+  // Aggregate metrics
+  private @Metric MutableGaugeLong dbSnapshotsDirSize;
+  private @Metric MutableGaugeLong totalSstFilesCount;
+  private @Metric MutableGaugeLong numSnapshots;
+
+  private final OMMetadataManager metadataManager;
+  private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME);
+
+  OMSnapshotDirectoryMetrics(ConfigurationSource conf,
+      OMMetadataManager metadataManager) {
+    super("OMSnapshotDirectoryMetrics",
+        
conf.getTimeDuration(OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL,
+        OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT,
+            TimeUnit.MILLISECONDS));
+    this.metadataManager = metadataManager;
+  }
+
+  public static OMSnapshotDirectoryMetrics create(ConfigurationSource conf,
+      String parent, OMMetadataManager metadataManager) {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register(SOURCE_NAME, parent,
+        new OMSnapshotDirectoryMetrics(conf, metadataManager));
+  }
+
+  /**
+   * @return if the update was successful.
+   * Updates all metrics synchronously - both aggregate and 
per-checkpoint-directory.
+   */
+  @Override
+  protected boolean updateMetrics() {
+    DBStore store = metadataManager.getStore();
+    if (!(store instanceof RDBStore)) {
+      LOG.debug("Store is not RDBStore, skipping snapshot directory metrics 
update");
+      resetMetrics();
+      return false;
+    }
+
+    String snapshotsParentDir = store.getSnapshotsParentDir();
+
+    if (snapshotsParentDir == null) {
+      resetMetrics();
+      return false;
+    }
+
+    File snapshotsDir = new File(snapshotsParentDir);
+    if (!snapshotsDir.exists() || !snapshotsDir.isDirectory()) {
+      resetMetrics();
+      return false;
+    }
+
+    try {
+      // Calculate aggregate metrics
+      calculateAndUpdateMetrics(snapshotsDir);
+    } catch (Exception e) {
+      LOG.warn("Error calculating snapshot directory metrics", e);
+      resetMetrics();
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Calculates & updates directory size metrics accounting for hardlinks.
+   * (only counts each inode once).
+   * Uses Files.getAttribute to get the inode number and tracks visited inodes.
+   *
+   * @param directory the directory containing all checkpointDirs.
+   */
+  private void calculateAndUpdateMetrics(File directory) throws IOException {
+    Set<Object> visitedInodes = new HashSet<>();
+    long totalSize = 0;
+    long sstFileCount = 0;
+    int snapshotCount = 0;
+    try (Stream<Path> checkpointDirs = Files.list(directory.toPath())) {
+      for (Path checkpointDir : checkpointDirs.collect(Collectors.toList())) {
+        snapshotCount++;
+        try (Stream<Path> files = Files.list(checkpointDir)) {
+          for (Path path : files.collect(Collectors.toList())) {
+            if (Files.isRegularFile(path)) {
+              try {
+                // Get inode number
+                Object fileKey = IOUtils.getINode(path);
+                if (fileKey == null) {
+                  // Fallback: use file path + size as unique identifier
+                  fileKey = path.toAbsolutePath() + ":" + Files.size(path);
+                }
+                // Only count this file if we haven't seen this inode before
+                if (visitedInodes.add(fileKey)) {
+                  if (path.toFile().getName().endsWith(ROCKSDB_SST_SUFFIX)) {
+                    sstFileCount++;
+                  }
+                  totalSize += Files.size(path);
+                }
+              } catch (UnsupportedOperationException | IOException e) {
+                // Fallback: if we can't get inode, just count the file size.
+                LOG.error("Could not get inode for {}, using file size 
directly: {}",
+                    path, e.getMessage());
+                totalSize += Files.size(path);

Review Comment:
   The snapshot count is incremented for every entry in the snapshots 
directory, including non-directory files. The counter should only be 
incremented for directories since only directories represent snapshots. Add a 
check for Files.isDirectory(checkpointDir) before incrementing snapshotCount.
   ```suggestion
           if (Files.isDirectory(checkpointDir)) {
             snapshotCount++;
             try (Stream<Path> files = Files.list(checkpointDir)) {
               for (Path path : files.collect(Collectors.toList())) {
                 if (Files.isRegularFile(path)) {
                   try {
                     // Get inode number
                     Object fileKey = IOUtils.getINode(path);
                     if (fileKey == null) {
                       // Fallback: use file path + size as unique identifier
                       fileKey = path.toAbsolutePath() + ":" + Files.size(path);
                     }
                     // Only count this file if we haven't seen this inode 
before
                     if (visitedInodes.add(fileKey)) {
                       if 
(path.toFile().getName().endsWith(ROCKSDB_SST_SUFFIX)) {
                         sstFileCount++;
                       }
                       totalSize += Files.size(path);
                     }
                   } catch (UnsupportedOperationException | IOException e) {
                     // Fallback: if we can't get inode, just count the file 
size.
                     LOG.error("Could not get inode for {}, using file size 
directly: {}",
                         path, e.getMessage());
                     totalSize += Files.size(path);
                   }
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.ha.OMPeriodicMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Metrics for tracking db.snapshots directory space usage and SST file counts.
+ * Provides both aggregate metrics and per-checkpoint-directory metrics.
+ * Metrics are updated asynchronously to avoid blocking operations.
+ */
[email protected]
+@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE)
+public final class OMSnapshotDirectoryMetrics extends OMPeriodicMetrics 
implements MetricsSource {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class);
+  private static final String SOURCE_NAME =
+      OMSnapshotDirectoryMetrics.class.getSimpleName();
+
+  // Aggregate metrics
+  private @Metric MutableGaugeLong dbSnapshotsDirSize;
+  private @Metric MutableGaugeLong totalSstFilesCount;
+  private @Metric MutableGaugeLong numSnapshots;
+
+  private final OMMetadataManager metadataManager;
+  private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME);
+
+  OMSnapshotDirectoryMetrics(ConfigurationSource conf,
+      OMMetadataManager metadataManager) {
+    super("OMSnapshotDirectoryMetrics",
+        
conf.getTimeDuration(OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL,
+        OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT,
+            TimeUnit.MILLISECONDS));
+    this.metadataManager = metadataManager;
+  }
+
+  public static OMSnapshotDirectoryMetrics create(ConfigurationSource conf,
+      String parent, OMMetadataManager metadataManager) {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register(SOURCE_NAME, parent,
+        new OMSnapshotDirectoryMetrics(conf, metadataManager));
+  }
+
+  /**
+   * @return if the update was successful.
+   * Updates all metrics synchronously - both aggregate and 
per-checkpoint-directory.
+   */
+  @Override
+  protected boolean updateMetrics() {
+    DBStore store = metadataManager.getStore();
+    if (!(store instanceof RDBStore)) {
+      LOG.debug("Store is not RDBStore, skipping snapshot directory metrics 
update");
+      resetMetrics();
+      return false;
+    }
+
+    String snapshotsParentDir = store.getSnapshotsParentDir();
+
+    if (snapshotsParentDir == null) {
+      resetMetrics();
+      return false;
+    }
+
+    File snapshotsDir = new File(snapshotsParentDir);
+    if (!snapshotsDir.exists() || !snapshotsDir.isDirectory()) {
+      resetMetrics();
+      return false;
+    }
+
+    try {
+      // Calculate aggregate metrics
+      calculateAndUpdateMetrics(snapshotsDir);
+    } catch (Exception e) {
+      LOG.warn("Error calculating snapshot directory metrics", e);
+      resetMetrics();
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Calculates & updates directory size metrics accounting for hardlinks.
+   * (only counts each inode once).
+   * Uses Files.getAttribute to get the inode number and tracks visited inodes.
+   *
+   * @param directory the directory containing all checkpointDirs.
+   */
+  private void calculateAndUpdateMetrics(File directory) throws IOException {
+    Set<Object> visitedInodes = new HashSet<>();
+    long totalSize = 0;
+    long sstFileCount = 0;
+    int snapshotCount = 0;
+    try (Stream<Path> checkpointDirs = Files.list(directory.toPath())) {
+      for (Path checkpointDir : checkpointDirs.collect(Collectors.toList())) {
+        snapshotCount++;
+        try (Stream<Path> files = Files.list(checkpointDir)) {
+          for (Path path : files.collect(Collectors.toList())) {
+            if (Files.isRegularFile(path)) {
+              try {
+                // Get inode number
+                Object fileKey = IOUtils.getINode(path);
+                if (fileKey == null) {
+                  // Fallback: use file path + size as unique identifier
+                  fileKey = path.toAbsolutePath() + ":" + Files.size(path);
+                }
+                // Only count this file if we haven't seen this inode before
+                if (visitedInodes.add(fileKey)) {
+                  if (path.toFile().getName().endsWith(ROCKSDB_SST_SUFFIX)) {
+                    sstFileCount++;
+                  }
+                  totalSize += Files.size(path);
+                }
+              } catch (UnsupportedOperationException | IOException e) {
+                // Fallback: if we can't get inode, just count the file size.
+                LOG.error("Could not get inode for {}, using file size 
directly: {}",
+                    path, e.getMessage());
+                totalSize += Files.size(path);

Review Comment:
   The fallback path does not increment the SST file count. When inode 
retrieval fails, the code falls back to adding the file size directly but skips 
the SST file counting logic. This means SST files that fail inode retrieval 
won't be counted in the totalSstFilesCount metric, leading to inaccurate 
metrics. The SST file check and counter increment should be moved outside the 
try-catch block or duplicated in the catch block.
   ```suggestion
                   // Fallback: if we can't get inode, just count the file size 
and,
                   // if applicable, the SST file in the metrics.
                   LOG.error("Could not get inode for {}, using file size 
directly: {}",
                       path, e.getMessage());
                   totalSize += Files.size(path);
                   if (path.toFile().getName().endsWith(ROCKSDB_SST_SUFFIX)) {
                     sstFileCount++;
                   }
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.ha.OMPeriodicMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Metrics for tracking db.snapshots directory space usage and SST file counts.
+ * Provides both aggregate metrics and per-checkpoint-directory metrics.
+ * Metrics are updated asynchronously to avoid blocking operations.
+ */
[email protected]
+@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE)
+public final class OMSnapshotDirectoryMetrics extends OMPeriodicMetrics 
implements MetricsSource {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class);
+  private static final String SOURCE_NAME =
+      OMSnapshotDirectoryMetrics.class.getSimpleName();
+
+  // Aggregate metrics
+  private @Metric MutableGaugeLong dbSnapshotsDirSize;
+  private @Metric MutableGaugeLong totalSstFilesCount;
+  private @Metric MutableGaugeLong numSnapshots;
+
+  private final OMMetadataManager metadataManager;
+  private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME);
+
+  OMSnapshotDirectoryMetrics(ConfigurationSource conf,
+      OMMetadataManager metadataManager) {
+    super("OMSnapshotDirectoryMetrics",
+        
conf.getTimeDuration(OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL,
+        OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT,
+            TimeUnit.MILLISECONDS));
+    this.metadataManager = metadataManager;
+  }
+
+  public static OMSnapshotDirectoryMetrics create(ConfigurationSource conf,
+      String parent, OMMetadataManager metadataManager) {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register(SOURCE_NAME, parent,
+        new OMSnapshotDirectoryMetrics(conf, metadataManager));
+  }
+
+  /**
+   * @return if the update was successful.
+   * Updates all metrics synchronously - both aggregate and 
per-checkpoint-directory.
+   */
+  @Override
+  protected boolean updateMetrics() {
+    DBStore store = metadataManager.getStore();
+    if (!(store instanceof RDBStore)) {
+      LOG.debug("Store is not RDBStore, skipping snapshot directory metrics 
update");
+      resetMetrics();
+      return false;
+    }
+
+    String snapshotsParentDir = store.getSnapshotsParentDir();
+
+    if (snapshotsParentDir == null) {
+      resetMetrics();
+      return false;
+    }
+
+    File snapshotsDir = new File(snapshotsParentDir);
+    if (!snapshotsDir.exists() || !snapshotsDir.isDirectory()) {
+      resetMetrics();
+      return false;
+    }
+
+    try {
+      // Calculate aggregate metrics
+      calculateAndUpdateMetrics(snapshotsDir);
+    } catch (Exception e) {
+      LOG.warn("Error calculating snapshot directory metrics", e);
+      resetMetrics();
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Calculates & updates directory size metrics accounting for hardlinks.
+   * (only counts each inode once).
+   * Uses Files.getAttribute to get the inode number and tracks visited inodes.
+   *
+   * @param directory the directory containing all checkpointDirs.
+   */
+  private void calculateAndUpdateMetrics(File directory) throws IOException {
+    Set<Object> visitedInodes = new HashSet<>();
+    long totalSize = 0;
+    long sstFileCount = 0;
+    int snapshotCount = 0;
+    try (Stream<Path> checkpointDirs = Files.list(directory.toPath())) {
+      for (Path checkpointDir : checkpointDirs.collect(Collectors.toList())) {
+        snapshotCount++;
+        try (Stream<Path> files = Files.list(checkpointDir)) {
+          for (Path path : files.collect(Collectors.toList())) {
+            if (Files.isRegularFile(path)) {
+              try {
+                // Get inode number
+                Object fileKey = IOUtils.getINode(path);
+                if (fileKey == null) {
+                  // Fallback: use file path + size as unique identifier
+                  fileKey = path.toAbsolutePath() + ":" + Files.size(path);
+                }
+                // Only count this file if we haven't seen this inode before
+                if (visitedInodes.add(fileKey)) {
+                  if (path.toFile().getName().endsWith(ROCKSDB_SST_SUFFIX)) {
+                    sstFileCount++;
+                  }
+                  totalSize += Files.size(path);
+                }
+              } catch (UnsupportedOperationException | IOException e) {
+                // Fallback: if we can't get inode, just count the file size.
+                LOG.error("Could not get inode for {}, using file size 
directly: {}",

Review Comment:
   The log level should be WARN instead of ERROR since this is a fallback 
mechanism for non-critical inode retrieval failure. The metrics calculation 
continues and uses file size directly, making this a degraded but still 
functional path rather than an error condition.
   ```suggestion
                   LOG.warn("Could not get inode for {}, using file size 
directly: {}",
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.ha.OMPeriodicMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Metrics for tracking db.snapshots directory space usage and SST file counts.
+ * Provides both aggregate metrics and per-checkpoint-directory metrics.
+ * Metrics are updated asynchronously to avoid blocking operations.
+ */
[email protected]
+@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE)
+public final class OMSnapshotDirectoryMetrics extends OMPeriodicMetrics 
implements MetricsSource {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class);
+  private static final String SOURCE_NAME =
+      OMSnapshotDirectoryMetrics.class.getSimpleName();
+
+  // Aggregate metrics
+  private @Metric MutableGaugeLong dbSnapshotsDirSize;
+  private @Metric MutableGaugeLong totalSstFilesCount;
+  private @Metric MutableGaugeLong numSnapshots;
+
+  private final OMMetadataManager metadataManager;
+  private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME);
+
+  OMSnapshotDirectoryMetrics(ConfigurationSource conf,
+      OMMetadataManager metadataManager) {
+    super("OMSnapshotDirectoryMetrics",
+        
conf.getTimeDuration(OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL,
+        OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT,
+            TimeUnit.MILLISECONDS));
+    this.metadataManager = metadataManager;
+  }
+
+  public static OMSnapshotDirectoryMetrics create(ConfigurationSource conf,
+      String parent, OMMetadataManager metadataManager) {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register(SOURCE_NAME, parent,
+        new OMSnapshotDirectoryMetrics(conf, metadataManager));
+  }
+
+  /**
+   * @return if the update was successful.
+   * Updates all metrics synchronously - both aggregate and 
per-checkpoint-directory.
+   */

Review Comment:
   Access of [element](1) annotated with VisibleForTesting found in production 
code.
   ```suggestion
      */
     @VisibleForTesting
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to