Copilot commented on code in PR #9406: URL: https://github.com/apache/ozone/pull/9406#discussion_r2577972822
########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java: ########## @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.snapshot; + +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdds.annotation.InterfaceAudience; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Metrics for tracking db.snapshots directory space usage and SST file counts. + * Provides both aggregate metrics and per-checkpoint-directory metrics. + * Metrics are updated asynchronously to avoid blocking operations. + */ [email protected] +@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE) +public final class OMSnapshotDirectoryMetrics implements MetricsSource { + private static final Logger LOG = + LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class); + private static final String SOURCE_NAME = + OMSnapshotDirectoryMetrics.class.getSimpleName(); + + // Aggregate metrics + private @Metric MutableGaugeLong dbSnapshotsDirSize; + private @Metric MutableGaugeLong totalSstFilesCount; + private @Metric MutableGaugeLong numSnapshots; + + private final AtomicLong lastUpdateTime = new AtomicLong(0); + // Change the field declaration: + private final AtomicReference<CompletableFuture<Void>> currentUpdateFutureRef = + new AtomicReference<>(); + private final OMMetadataManager metadataManager; + private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME); + + // Per-checkpoint-directory metrics storage + private volatile Map<String, CheckpointMetrics> checkpointMetricsMap = new HashMap<>(); + + private Timer updateTimer; + + // Add start method (after unRegister method) + /** + * Starts the periodic metrics update task. + * + * @param conf OzoneConfiguration for reading update interval + */ + public void start(OzoneConfiguration conf) { + long updateInterval = conf.getTimeDuration(OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL, + OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + + updateTimer = new Timer("OMSnapshotDirectoryMetricsUpdate", true); + updateTimer.schedule(new TimerTask() { + @Override + public void run() { + updateMetricsAsync(); + } + }, 0, updateInterval); + + // Do initial update + updateMetricsAsync(); Review Comment: Initial metrics update is triggered twice: once synchronously in the Timer task schedule at line 105, and once by the timer immediately (delay=0 at line 102). This causes duplicate computation at startup. Consider removing line 105 and relying solely on the timer's initial execution. ```suggestion // Removed redundant initial update; timer will execute immediately. ``` ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java: ########## @@ -261,6 +264,37 @@ public DBCheckpointMetrics getDBCheckpointMetrics() { return dbCheckpointMetrics; } + // Add getter with lazy initialization check + public OMSnapshotDirectoryMetrics getSnapshotDirectoryMetrics() { + if (snapshotDirectoryMetrics == null) { + throw new IllegalStateException( + "SnapshotDirectoryMetrics not initialized. Call startSnapshotDirectoryMetrics() first."); + } + return snapshotDirectoryMetrics; + } + + /** + * Starts periodic updates for snapshot directory metrics. + * Creates the metrics instance if it doesn't exist. + * + * @param configuration OzoneConfiguration for reading update interval + * @param metadataManager OMMetadataManager for accessing snapshot directories + */ + public void startSnapshotDirectoryMetrics(OzoneConfiguration configuration, + OMMetadataManager metadataManager) { + if (snapshotDirectoryMetrics == null) { + snapshotDirectoryMetrics = OMSnapshotDirectoryMetrics.create("OM Metrics", metadataManager); + } + snapshotDirectoryMetrics.start(configuration); + } + + // Add stop method (for cleanup) Review Comment: The comment "Add stop method (for cleanup)" appears to be a development note and should be removed. The JavaDoc or method name should be sufficient to explain its purpose. ```suggestion ``` ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java: ########## @@ -261,6 +264,37 @@ public DBCheckpointMetrics getDBCheckpointMetrics() { return dbCheckpointMetrics; } + // Add getter with lazy initialization check + public OMSnapshotDirectoryMetrics getSnapshotDirectoryMetrics() { + if (snapshotDirectoryMetrics == null) { + throw new IllegalStateException( + "SnapshotDirectoryMetrics not initialized. Call startSnapshotDirectoryMetrics() first."); + } + return snapshotDirectoryMetrics; + } Review Comment: The `getSnapshotDirectoryMetrics()` method throws an `IllegalStateException` if metrics are not initialized, but this exception is not documented in JavaDoc. Consider adding a `@throws` tag to document this behavior for API consumers. ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java: ########## @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.snapshot; + +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdds.annotation.InterfaceAudience; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Metrics for tracking db.snapshots directory space usage and SST file counts. + * Provides both aggregate metrics and per-checkpoint-directory metrics. + * Metrics are updated asynchronously to avoid blocking operations. + */ [email protected] +@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE) +public final class OMSnapshotDirectoryMetrics implements MetricsSource { + private static final Logger LOG = + LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class); + private static final String SOURCE_NAME = + OMSnapshotDirectoryMetrics.class.getSimpleName(); + + // Aggregate metrics + private @Metric MutableGaugeLong dbSnapshotsDirSize; + private @Metric MutableGaugeLong totalSstFilesCount; + private @Metric MutableGaugeLong numSnapshots; + + private final AtomicLong lastUpdateTime = new AtomicLong(0); + // Change the field declaration: + private final AtomicReference<CompletableFuture<Void>> currentUpdateFutureRef = + new AtomicReference<>(); + private final OMMetadataManager metadataManager; + private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME); + + // Per-checkpoint-directory metrics storage + private volatile Map<String, CheckpointMetrics> checkpointMetricsMap = new HashMap<>(); + + private Timer updateTimer; + + // Add start method (after unRegister method) + /** + * Starts the periodic metrics update task. + * + * @param conf OzoneConfiguration for reading update interval + */ + public void start(OzoneConfiguration conf) { + long updateInterval = conf.getTimeDuration(OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL, + OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + + updateTimer = new Timer("OMSnapshotDirectoryMetricsUpdate", true); + updateTimer.schedule(new TimerTask() { + @Override + public void run() { + updateMetricsAsync(); + } + }, 0, updateInterval); + + // Do initial update + updateMetricsAsync(); + } + + /** + * Stops the periodic metrics update task. + */ + public void stop() { + if (updateTimer != null) { + updateTimer.cancel(); + updateTimer = null; + } + } + + public void unRegister() { + stop(); + MetricsSystem ms = DefaultMetricsSystem.instance(); + ms.unregisterSource(SOURCE_NAME); + } + + /** + * Internal class to store per-checkpoint metrics. + */ + private static class CheckpointMetrics { + private final long size; + private final int sstFileCount; + + CheckpointMetrics(long size, int sstFileCount) { + this.size = size; + this.sstFileCount = sstFileCount; + } + + public long getSize() { + return size; + } + + public int getSstFileCount() { + return sstFileCount; + } + } + + private OMSnapshotDirectoryMetrics(OMMetadataManager metadataManager) { + this.metadataManager = metadataManager; + } + + public static OMSnapshotDirectoryMetrics create(String parent, + OMMetadataManager metadataManager) { + MetricsSystem ms = DefaultMetricsSystem.instance(); + return ms.register(SOURCE_NAME, + parent, + new OMSnapshotDirectoryMetrics(metadataManager)); + } + + /** + * Updates all metrics (aggregate and per-checkpoint) asynchronously + * in a background thread. + */ + public void updateMetricsAsync() { + CompletableFuture<Void> currentUpdateFuture = currentUpdateFutureRef.get(); + if (currentUpdateFuture != null && !currentUpdateFuture.isDone()) { + return; + } + + CompletableFuture<Void> newFuture = CompletableFuture.runAsync(() -> { + try { + updateMetrics(); + lastUpdateTime.set(System.currentTimeMillis()); + } catch (Exception e) { + LOG.warn("Failed to update snapshot directory metrics", e); + } finally { + currentUpdateFutureRef.set(null); + } + }); + + currentUpdateFutureRef.set(newFuture); + } + + /** + * Updates all metrics synchronously - both aggregate and per-checkpoint-directory. + */ + @VisibleForTesting + void updateMetrics() throws IOException { + DBStore store = metadataManager.getStore(); + if (!(store instanceof RDBStore)) { + LOG.debug("Store is not RDBStore, skipping snapshot directory metrics update"); + resetMetrics(); + return; + } + + RDBStore rdbStore = (RDBStore) store; + String snapshotsParentDir = rdbStore.getSnapshotsParentDir(); + + if (snapshotsParentDir == null) { + resetMetrics(); + return; + } + + File snapshotsDir = new File(snapshotsParentDir); + if (!snapshotsDir.exists() || !snapshotsDir.isDirectory()) { + resetMetrics(); + return; + } + + try { + // Calculate aggregate metrics + long totalSize = FileUtils.sizeOfDirectory(snapshotsDir); + dbSnapshotsDirSize.set(totalSize); + + // Calculate per-checkpoint-directory metrics and aggregate totals + File[] checkpointDirs = snapshotsDir.listFiles(File::isDirectory); + int totalSstCount = 0; + int snapshotCount = 0; + Map<String, CheckpointMetrics> newCheckpointMetricsMap = new HashMap<>(); + + if (checkpointDirs != null) { + snapshotCount = checkpointDirs.length; + + for (File checkpointDir : checkpointDirs) { + if (!checkpointDir.isDirectory()) { + continue; + } + + String checkpointDirName = checkpointDir.getName(); + long checkpointSize = 0; + int sstFileCount = 0; + + try { + checkpointSize = FileUtils.sizeOfDirectory(checkpointDir); + File[] sstFiles = checkpointDir.listFiles((dir, name) -> + name.toLowerCase().endsWith(ROCKSDB_SST_SUFFIX)); + if (sstFiles != null) { + sstFileCount = sstFiles.length; + } + } catch (Exception e) { + LOG.debug("Error calculating metrics for checkpoint directory {}", + checkpointDirName, e); + // Continue with other directories even if one fails + continue; + } + + totalSstCount += sstFileCount; + newCheckpointMetricsMap.put(checkpointDirName, + new CheckpointMetrics(checkpointSize, sstFileCount)); + } + } + + // Update aggregate metrics + totalSstFilesCount.set(totalSstCount); + numSnapshots.set(snapshotCount); + + // Atomically update per-checkpoint metrics map + checkpointMetricsMap = newCheckpointMetricsMap; + + if (LOG.isDebugEnabled()) { + LOG.debug("Updated snapshot directory metrics: size={}, sstFiles={}, snapshots={}", + totalSize, totalSstCount, snapshotCount); + } + + } catch (Exception e) { + LOG.warn("Error calculating snapshot directory metrics", e); + resetMetrics(); + } + } + + /** + * Resets all metrics to zero. + */ + private void resetMetrics() { + dbSnapshotsDirSize.set(0); + totalSstFilesCount.set(0); + numSnapshots.set(0); + checkpointMetricsMap = new HashMap<>(); + } + + /** + * Implements MetricsSource to provide metrics. + * Reads from cached values updated by updateMetrics(). + */ + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + // Add aggregate metrics + collector.addRecord(SOURCE_NAME) + .setContext("Snapshot Directory Metrics") + .addGauge(SnapshotMetricsInfo.DbSnapshotsDirSize, dbSnapshotsDirSize.value()) + .addGauge(SnapshotMetricsInfo.TotalSstFilesCount, totalSstFilesCount.value()) + .addGauge(SnapshotMetricsInfo.NumSnapshots, numSnapshots.value()); + + // Add per-checkpoint-directory metrics from cached map + Map<String, CheckpointMetrics> currentMetrics = checkpointMetricsMap; + for (Map.Entry<String, CheckpointMetrics> entry : currentMetrics.entrySet()) { + String checkpointDirName = entry.getKey(); + CheckpointMetrics metrics = entry.getValue(); + + collector.addRecord(SOURCE_NAME) + .setContext("Per-Checkpoint Directory Metrics") + .tag(SnapshotMetricsInfo.CheckpointDirName, checkpointDirName) + .addGauge(SnapshotMetricsInfo.CheckpointDirSize, metrics.getSize()) + .addGauge(SnapshotMetricsInfo.CheckpointSstFilesCount, metrics.getSstFileCount()); + } + } + + @VisibleForTesting + public long getDbSnapshotsDirSize() { + return dbSnapshotsDirSize.value(); + } + + @VisibleForTesting + public long getTotalSstFilesCount() { + return totalSstFilesCount.value(); + } + + @VisibleForTesting + public long getNumSnapshots() { + return numSnapshots.value(); + } + + @VisibleForTesting + public long getLastUpdateTime() { + return lastUpdateTime.get(); + } + + @VisibleForTesting + public Map<String, CheckpointMetrics> getCheckpointMetricsMap() { + return Collections.unmodifiableMap(new HashMap<>(checkpointMetricsMap)); + } + + /** + * Metrics info enum for snapshot directory metrics. + */ + enum SnapshotMetricsInfo implements MetricsInfo { + // Aggregate metrics + DbSnapshotsDirSize("Total size of db.snapshots directory in bytes"), + TotalSstFilesCount("Total number of SST files across all snapshots"), + NumSnapshots("Total number of snapshot checkpoint directories"), + + // Per-checkpoint-directory metric tag + CheckpointDirName("Checkpoint directory name"), + + // Per-checkpoint-directory metrics + CheckpointDirSize("Size of checkpoint directory in bytes"), + CheckpointSstFilesCount("Number of SST files in checkpoint directory"); + + private final String desc; + + SnapshotMetricsInfo(String desc) { + this.desc = desc; + } + + @Override + public String description() { + return desc; + } + } +} Review Comment: The new `OMSnapshotDirectoryMetrics` class lacks test coverage. Given that there are comprehensive tests for other snapshot-related classes in `hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/`, this new metrics class should have unit tests covering: - Async metrics update behavior - Directory size and SST file counting logic - Error handling when directories don't exist - Timer lifecycle (start/stop) - Concurrent update prevention - Per-checkpoint metrics collection Consider adding a `TestOMSnapshotDirectoryMetrics` test class. ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java: ########## @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.snapshot; + +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdds.annotation.InterfaceAudience; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Metrics for tracking db.snapshots directory space usage and SST file counts. + * Provides both aggregate metrics and per-checkpoint-directory metrics. + * Metrics are updated asynchronously to avoid blocking operations. + */ [email protected] +@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE) +public final class OMSnapshotDirectoryMetrics implements MetricsSource { + private static final Logger LOG = + LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class); + private static final String SOURCE_NAME = + OMSnapshotDirectoryMetrics.class.getSimpleName(); + + // Aggregate metrics + private @Metric MutableGaugeLong dbSnapshotsDirSize; + private @Metric MutableGaugeLong totalSstFilesCount; + private @Metric MutableGaugeLong numSnapshots; + + private final AtomicLong lastUpdateTime = new AtomicLong(0); + // Change the field declaration: + private final AtomicReference<CompletableFuture<Void>> currentUpdateFutureRef = + new AtomicReference<>(); + private final OMMetadataManager metadataManager; + private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME); + + // Per-checkpoint-directory metrics storage + private volatile Map<String, CheckpointMetrics> checkpointMetricsMap = new HashMap<>(); + + private Timer updateTimer; + + // Add start method (after unRegister method) + /** + * Starts the periodic metrics update task. + * + * @param conf OzoneConfiguration for reading update interval + */ + public void start(OzoneConfiguration conf) { + long updateInterval = conf.getTimeDuration(OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL, + OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + + updateTimer = new Timer("OMSnapshotDirectoryMetricsUpdate", true); + updateTimer.schedule(new TimerTask() { + @Override + public void run() { + updateMetricsAsync(); + } + }, 0, updateInterval); + + // Do initial update + updateMetricsAsync(); + } + + /** + * Stops the periodic metrics update task. + */ + public void stop() { + if (updateTimer != null) { + updateTimer.cancel(); + updateTimer = null; + } + } + + public void unRegister() { + stop(); + MetricsSystem ms = DefaultMetricsSystem.instance(); + ms.unregisterSource(SOURCE_NAME); + } + + /** + * Internal class to store per-checkpoint metrics. + */ + private static class CheckpointMetrics { + private final long size; + private final int sstFileCount; + + CheckpointMetrics(long size, int sstFileCount) { + this.size = size; + this.sstFileCount = sstFileCount; + } + + public long getSize() { + return size; + } + + public int getSstFileCount() { + return sstFileCount; + } + } + + private OMSnapshotDirectoryMetrics(OMMetadataManager metadataManager) { + this.metadataManager = metadataManager; + } + + public static OMSnapshotDirectoryMetrics create(String parent, + OMMetadataManager metadataManager) { + MetricsSystem ms = DefaultMetricsSystem.instance(); + return ms.register(SOURCE_NAME, + parent, + new OMSnapshotDirectoryMetrics(metadataManager)); + } + + /** + * Updates all metrics (aggregate and per-checkpoint) asynchronously + * in a background thread. + */ + public void updateMetricsAsync() { + CompletableFuture<Void> currentUpdateFuture = currentUpdateFutureRef.get(); + if (currentUpdateFuture != null && !currentUpdateFuture.isDone()) { + return; + } + + CompletableFuture<Void> newFuture = CompletableFuture.runAsync(() -> { + try { + updateMetrics(); + lastUpdateTime.set(System.currentTimeMillis()); + } catch (Exception e) { + LOG.warn("Failed to update snapshot directory metrics", e); + } finally { + currentUpdateFutureRef.set(null); + } + }); + + currentUpdateFutureRef.set(newFuture); Review Comment: The `currentUpdateFutureRef` is set to `null` in the `finally` block (line 174) before checking if the new future should be set. This creates a race condition where: 1. Thread A completes and sets ref to null (line 174) 2. Thread B calls `updateMetricsAsync()`, sees null, and sets a new future (line 178) 3. Thread A continues and overwrites with its future (line 178) This could cause concurrent update executions. Consider setting the future reference before starting the async task, or use `compareAndSet` to atomically update the reference. ```suggestion } // No need to set currentUpdateFutureRef to null here. } }); // Atomically set the new future only if the previous is null or done. while (true) { CompletableFuture<Void> prev = currentUpdateFutureRef.get(); if (prev == null || prev.isDone()) { if (currentUpdateFutureRef.compareAndSet(prev, newFuture)) { break; } } else { // Another update started, do not overwrite. break; } } ``` ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java: ########## @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.snapshot; + +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdds.annotation.InterfaceAudience; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Metrics for tracking db.snapshots directory space usage and SST file counts. + * Provides both aggregate metrics and per-checkpoint-directory metrics. + * Metrics are updated asynchronously to avoid blocking operations. + */ [email protected] +@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE) +public final class OMSnapshotDirectoryMetrics implements MetricsSource { + private static final Logger LOG = + LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class); + private static final String SOURCE_NAME = + OMSnapshotDirectoryMetrics.class.getSimpleName(); + + // Aggregate metrics + private @Metric MutableGaugeLong dbSnapshotsDirSize; + private @Metric MutableGaugeLong totalSstFilesCount; + private @Metric MutableGaugeLong numSnapshots; + + private final AtomicLong lastUpdateTime = new AtomicLong(0); + // Change the field declaration: + private final AtomicReference<CompletableFuture<Void>> currentUpdateFutureRef = + new AtomicReference<>(); + private final OMMetadataManager metadataManager; + private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME); + + // Per-checkpoint-directory metrics storage + private volatile Map<String, CheckpointMetrics> checkpointMetricsMap = new HashMap<>(); + + private Timer updateTimer; + + // Add start method (after unRegister method) + /** + * Starts the periodic metrics update task. + * + * @param conf OzoneConfiguration for reading update interval + */ + public void start(OzoneConfiguration conf) { + long updateInterval = conf.getTimeDuration(OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL, + OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + + updateTimer = new Timer("OMSnapshotDirectoryMetricsUpdate", true); + updateTimer.schedule(new TimerTask() { + @Override + public void run() { + updateMetricsAsync(); + } + }, 0, updateInterval); + + // Do initial update + updateMetricsAsync(); + } Review Comment: Calling `start()` multiple times creates multiple Timer instances without canceling the previous one, causing a Timer leak. The old timer will continue running even after a new one is created. Consider checking if `updateTimer` is already non-null and either throwing an exception or canceling the old timer before creating a new one. ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java: ########## @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.snapshot; + +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdds.annotation.InterfaceAudience; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Metrics for tracking db.snapshots directory space usage and SST file counts. + * Provides both aggregate metrics and per-checkpoint-directory metrics. + * Metrics are updated asynchronously to avoid blocking operations. + */ [email protected] +@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE) +public final class OMSnapshotDirectoryMetrics implements MetricsSource { + private static final Logger LOG = + LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class); + private static final String SOURCE_NAME = + OMSnapshotDirectoryMetrics.class.getSimpleName(); + + // Aggregate metrics + private @Metric MutableGaugeLong dbSnapshotsDirSize; + private @Metric MutableGaugeLong totalSstFilesCount; + private @Metric MutableGaugeLong numSnapshots; + + private final AtomicLong lastUpdateTime = new AtomicLong(0); + // Change the field declaration: + private final AtomicReference<CompletableFuture<Void>> currentUpdateFutureRef = + new AtomicReference<>(); + private final OMMetadataManager metadataManager; + private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME); + + // Per-checkpoint-directory metrics storage + private volatile Map<String, CheckpointMetrics> checkpointMetricsMap = new HashMap<>(); + + private Timer updateTimer; + + // Add start method (after unRegister method) + /** + * Starts the periodic metrics update task. + * + * @param conf OzoneConfiguration for reading update interval + */ + public void start(OzoneConfiguration conf) { + long updateInterval = conf.getTimeDuration(OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL, + OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + + updateTimer = new Timer("OMSnapshotDirectoryMetricsUpdate", true); + updateTimer.schedule(new TimerTask() { + @Override + public void run() { + updateMetricsAsync(); + } + }, 0, updateInterval); + + // Do initial update + updateMetricsAsync(); + } + + /** + * Stops the periodic metrics update task. + */ + public void stop() { + if (updateTimer != null) { + updateTimer.cancel(); + updateTimer = null; + } + } + + public void unRegister() { + stop(); + MetricsSystem ms = DefaultMetricsSystem.instance(); + ms.unregisterSource(SOURCE_NAME); + } + + /** + * Internal class to store per-checkpoint metrics. + */ + private static class CheckpointMetrics { + private final long size; + private final int sstFileCount; + + CheckpointMetrics(long size, int sstFileCount) { + this.size = size; + this.sstFileCount = sstFileCount; + } + + public long getSize() { + return size; + } + + public int getSstFileCount() { + return sstFileCount; + } + } + + private OMSnapshotDirectoryMetrics(OMMetadataManager metadataManager) { + this.metadataManager = metadataManager; + } + + public static OMSnapshotDirectoryMetrics create(String parent, + OMMetadataManager metadataManager) { + MetricsSystem ms = DefaultMetricsSystem.instance(); + return ms.register(SOURCE_NAME, + parent, + new OMSnapshotDirectoryMetrics(metadataManager)); + } + + /** + * Updates all metrics (aggregate and per-checkpoint) asynchronously + * in a background thread. + */ + public void updateMetricsAsync() { + CompletableFuture<Void> currentUpdateFuture = currentUpdateFutureRef.get(); + if (currentUpdateFuture != null && !currentUpdateFuture.isDone()) { + return; + } + + CompletableFuture<Void> newFuture = CompletableFuture.runAsync(() -> { + try { + updateMetrics(); + lastUpdateTime.set(System.currentTimeMillis()); + } catch (Exception e) { + LOG.warn("Failed to update snapshot directory metrics", e); + } finally { + currentUpdateFutureRef.set(null); + } + }); + + currentUpdateFutureRef.set(newFuture); + } + + /** + * Updates all metrics synchronously - both aggregate and per-checkpoint-directory. + */ + @VisibleForTesting + void updateMetrics() throws IOException { + DBStore store = metadataManager.getStore(); + if (!(store instanceof RDBStore)) { + LOG.debug("Store is not RDBStore, skipping snapshot directory metrics update"); + resetMetrics(); + return; + } + + RDBStore rdbStore = (RDBStore) store; + String snapshotsParentDir = rdbStore.getSnapshotsParentDir(); + + if (snapshotsParentDir == null) { + resetMetrics(); + return; + } + + File snapshotsDir = new File(snapshotsParentDir); + if (!snapshotsDir.exists() || !snapshotsDir.isDirectory()) { + resetMetrics(); + return; + } + + try { + // Calculate aggregate metrics + long totalSize = FileUtils.sizeOfDirectory(snapshotsDir); + dbSnapshotsDirSize.set(totalSize); + + // Calculate per-checkpoint-directory metrics and aggregate totals + File[] checkpointDirs = snapshotsDir.listFiles(File::isDirectory); + int totalSstCount = 0; + int snapshotCount = 0; + Map<String, CheckpointMetrics> newCheckpointMetricsMap = new HashMap<>(); + + if (checkpointDirs != null) { + snapshotCount = checkpointDirs.length; + + for (File checkpointDir : checkpointDirs) { + if (!checkpointDir.isDirectory()) { + continue; + } + + String checkpointDirName = checkpointDir.getName(); + long checkpointSize = 0; + int sstFileCount = 0; + + try { + checkpointSize = FileUtils.sizeOfDirectory(checkpointDir); + File[] sstFiles = checkpointDir.listFiles((dir, name) -> + name.toLowerCase().endsWith(ROCKSDB_SST_SUFFIX)); + if (sstFiles != null) { + sstFileCount = sstFiles.length; + } + } catch (Exception e) { + LOG.debug("Error calculating metrics for checkpoint directory {}", + checkpointDirName, e); + // Continue with other directories even if one fails + continue; + } + + totalSstCount += sstFileCount; + newCheckpointMetricsMap.put(checkpointDirName, + new CheckpointMetrics(checkpointSize, sstFileCount)); + } + } + + // Update aggregate metrics + totalSstFilesCount.set(totalSstCount); + numSnapshots.set(snapshotCount); + + // Atomically update per-checkpoint metrics map + checkpointMetricsMap = newCheckpointMetricsMap; Review Comment: The HashMap instantiation is not thread-safe when read by `getMetrics()`. While the volatile reference ensures visibility of the map reference itself, there's a small window where `checkpointMetricsMap` could be partially constructed. Consider using `Collections.unmodifiableMap()` to wrap the HashMap before assignment to ensure safe publication. ```suggestion checkpointMetricsMap = Collections.unmodifiableMap(newCheckpointMetricsMap); ``` ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java: ########## @@ -261,6 +264,37 @@ public DBCheckpointMetrics getDBCheckpointMetrics() { return dbCheckpointMetrics; } + // Add getter with lazy initialization check Review Comment: The comment "Add getter with lazy initialization check" appears to be a development note and should be removed. The JavaDoc below already documents the method's behavior. ```suggestion ``` ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java: ########## @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.snapshot; + +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdds.annotation.InterfaceAudience; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Metrics for tracking db.snapshots directory space usage and SST file counts. + * Provides both aggregate metrics and per-checkpoint-directory metrics. + * Metrics are updated asynchronously to avoid blocking operations. + */ [email protected] +@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE) +public final class OMSnapshotDirectoryMetrics implements MetricsSource { + private static final Logger LOG = + LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class); + private static final String SOURCE_NAME = + OMSnapshotDirectoryMetrics.class.getSimpleName(); + + // Aggregate metrics + private @Metric MutableGaugeLong dbSnapshotsDirSize; + private @Metric MutableGaugeLong totalSstFilesCount; + private @Metric MutableGaugeLong numSnapshots; + + private final AtomicLong lastUpdateTime = new AtomicLong(0); + // Change the field declaration: + private final AtomicReference<CompletableFuture<Void>> currentUpdateFutureRef = + new AtomicReference<>(); + private final OMMetadataManager metadataManager; + private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME); + + // Per-checkpoint-directory metrics storage + private volatile Map<String, CheckpointMetrics> checkpointMetricsMap = new HashMap<>(); + + private Timer updateTimer; + + // Add start method (after unRegister method) Review Comment: The comment "Add start method (after unRegister method)" appears to be a note for positioning the method during development. It should be removed as it doesn't provide meaningful documentation about the method's purpose. ```suggestion ``` ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java: ########## @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.snapshot; + +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdds.annotation.InterfaceAudience; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Metrics for tracking db.snapshots directory space usage and SST file counts. + * Provides both aggregate metrics and per-checkpoint-directory metrics. + * Metrics are updated asynchronously to avoid blocking operations. + */ [email protected] +@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE) +public final class OMSnapshotDirectoryMetrics implements MetricsSource { + private static final Logger LOG = + LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class); + private static final String SOURCE_NAME = + OMSnapshotDirectoryMetrics.class.getSimpleName(); + + // Aggregate metrics + private @Metric MutableGaugeLong dbSnapshotsDirSize; + private @Metric MutableGaugeLong totalSstFilesCount; + private @Metric MutableGaugeLong numSnapshots; + + private final AtomicLong lastUpdateTime = new AtomicLong(0); + // Change the field declaration: Review Comment: The comment "Change the field declaration:" appears to be leftover from development. It should be removed as it doesn't add value to the code documentation. ```suggestion ``` ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java: ########## @@ -1950,6 +1954,9 @@ public void restart() throws IOException { metricsTimer = new Timer(); metricsTimer.schedule(scheduleOMMetricsWriteTask, 0, period); + // Start snapshot directory metrics updates Review Comment: The `startSnapshotDirectoryMetrics()` method is called in both `start()` (line 1873) and `restart()` (line 1958). If `restart()` is called after `start()`, the metrics instance will call `start()` again (line 288 in OMMetrics), which will create a new Timer without canceling the old one (see related issue in OMSnapshotDirectoryMetrics). This could lead to multiple concurrent timer threads updating the same metrics. Consider checking if the metrics are already started before calling `start()` again, or ensuring proper cleanup before restart. ```suggestion // Start snapshot directory metrics updates metrics.stopSnapshotDirectoryMetrics(); ``` ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java: ########## @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.snapshot; + +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdds.annotation.InterfaceAudience; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Metrics for tracking db.snapshots directory space usage and SST file counts. + * Provides both aggregate metrics and per-checkpoint-directory metrics. + * Metrics are updated asynchronously to avoid blocking operations. + */ [email protected] +@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE) +public final class OMSnapshotDirectoryMetrics implements MetricsSource { + private static final Logger LOG = + LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class); + private static final String SOURCE_NAME = + OMSnapshotDirectoryMetrics.class.getSimpleName(); + + // Aggregate metrics + private @Metric MutableGaugeLong dbSnapshotsDirSize; + private @Metric MutableGaugeLong totalSstFilesCount; + private @Metric MutableGaugeLong numSnapshots; + + private final AtomicLong lastUpdateTime = new AtomicLong(0); + // Change the field declaration: + private final AtomicReference<CompletableFuture<Void>> currentUpdateFutureRef = + new AtomicReference<>(); + private final OMMetadataManager metadataManager; + private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME); + + // Per-checkpoint-directory metrics storage + private volatile Map<String, CheckpointMetrics> checkpointMetricsMap = new HashMap<>(); + + private Timer updateTimer; + + // Add start method (after unRegister method) + /** + * Starts the periodic metrics update task. + * + * @param conf OzoneConfiguration for reading update interval + */ + public void start(OzoneConfiguration conf) { + long updateInterval = conf.getTimeDuration(OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL, + OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + + updateTimer = new Timer("OMSnapshotDirectoryMetricsUpdate", true); + updateTimer.schedule(new TimerTask() { + @Override + public void run() { + updateMetricsAsync(); + } + }, 0, updateInterval); + + // Do initial update + updateMetricsAsync(); + } + + /** + * Stops the periodic metrics update task. + */ + public void stop() { + if (updateTimer != null) { + updateTimer.cancel(); + updateTimer = null; + } + } + + public void unRegister() { + stop(); + MetricsSystem ms = DefaultMetricsSystem.instance(); + ms.unregisterSource(SOURCE_NAME); + } + + /** + * Internal class to store per-checkpoint metrics. + */ + private static class CheckpointMetrics { + private final long size; + private final int sstFileCount; + + CheckpointMetrics(long size, int sstFileCount) { + this.size = size; + this.sstFileCount = sstFileCount; + } + + public long getSize() { + return size; + } + + public int getSstFileCount() { + return sstFileCount; + } + } + + private OMSnapshotDirectoryMetrics(OMMetadataManager metadataManager) { + this.metadataManager = metadataManager; + } + + public static OMSnapshotDirectoryMetrics create(String parent, + OMMetadataManager metadataManager) { + MetricsSystem ms = DefaultMetricsSystem.instance(); + return ms.register(SOURCE_NAME, + parent, + new OMSnapshotDirectoryMetrics(metadataManager)); + } + + /** + * Updates all metrics (aggregate and per-checkpoint) asynchronously + * in a background thread. + */ + public void updateMetricsAsync() { + CompletableFuture<Void> currentUpdateFuture = currentUpdateFutureRef.get(); + if (currentUpdateFuture != null && !currentUpdateFuture.isDone()) { + return; + } + + CompletableFuture<Void> newFuture = CompletableFuture.runAsync(() -> { + try { + updateMetrics(); + lastUpdateTime.set(System.currentTimeMillis()); + } catch (Exception e) { + LOG.warn("Failed to update snapshot directory metrics", e); + } finally { + currentUpdateFutureRef.set(null); + } + }); + + currentUpdateFutureRef.set(newFuture); + } + + /** + * Updates all metrics synchronously - both aggregate and per-checkpoint-directory. + */ + @VisibleForTesting + void updateMetrics() throws IOException { + DBStore store = metadataManager.getStore(); + if (!(store instanceof RDBStore)) { + LOG.debug("Store is not RDBStore, skipping snapshot directory metrics update"); + resetMetrics(); + return; + } + + RDBStore rdbStore = (RDBStore) store; + String snapshotsParentDir = rdbStore.getSnapshotsParentDir(); + + if (snapshotsParentDir == null) { + resetMetrics(); + return; + } + + File snapshotsDir = new File(snapshotsParentDir); + if (!snapshotsDir.exists() || !snapshotsDir.isDirectory()) { + resetMetrics(); + return; + } + + try { + // Calculate aggregate metrics + long totalSize = FileUtils.sizeOfDirectory(snapshotsDir); + dbSnapshotsDirSize.set(totalSize); + + // Calculate per-checkpoint-directory metrics and aggregate totals + File[] checkpointDirs = snapshotsDir.listFiles(File::isDirectory); + int totalSstCount = 0; + int snapshotCount = 0; + Map<String, CheckpointMetrics> newCheckpointMetricsMap = new HashMap<>(); + + if (checkpointDirs != null) { + snapshotCount = checkpointDirs.length; + + for (File checkpointDir : checkpointDirs) { + if (!checkpointDir.isDirectory()) { + continue; + } Review Comment: [nitpick] The `isDirectory()` check at line 222 is redundant because `listFiles(File::isDirectory)` at line 213 already filters for directories only. The check can be safely removed to simplify the code. ```suggestion ``` ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java: ########## @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.snapshot; + +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdds.annotation.InterfaceAudience; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Metrics for tracking db.snapshots directory space usage and SST file counts. + * Provides both aggregate metrics and per-checkpoint-directory metrics. + * Metrics are updated asynchronously to avoid blocking operations. + */ [email protected] +@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE) +public final class OMSnapshotDirectoryMetrics implements MetricsSource { + private static final Logger LOG = + LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class); + private static final String SOURCE_NAME = + OMSnapshotDirectoryMetrics.class.getSimpleName(); + + // Aggregate metrics + private @Metric MutableGaugeLong dbSnapshotsDirSize; + private @Metric MutableGaugeLong totalSstFilesCount; + private @Metric MutableGaugeLong numSnapshots; + + private final AtomicLong lastUpdateTime = new AtomicLong(0); + // Change the field declaration: + private final AtomicReference<CompletableFuture<Void>> currentUpdateFutureRef = + new AtomicReference<>(); + private final OMMetadataManager metadataManager; + private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME); + + // Per-checkpoint-directory metrics storage + private volatile Map<String, CheckpointMetrics> checkpointMetricsMap = new HashMap<>(); + + private Timer updateTimer; + + // Add start method (after unRegister method) + /** + * Starts the periodic metrics update task. + * + * @param conf OzoneConfiguration for reading update interval + */ + public void start(OzoneConfiguration conf) { + long updateInterval = conf.getTimeDuration(OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL, + OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + + updateTimer = new Timer("OMSnapshotDirectoryMetricsUpdate", true); + updateTimer.schedule(new TimerTask() { + @Override + public void run() { + updateMetricsAsync(); + } + }, 0, updateInterval); + + // Do initial update + updateMetricsAsync(); + } + + /** + * Stops the periodic metrics update task. + */ + public void stop() { + if (updateTimer != null) { + updateTimer.cancel(); + updateTimer = null; + } + } + + public void unRegister() { + stop(); + MetricsSystem ms = DefaultMetricsSystem.instance(); + ms.unregisterSource(SOURCE_NAME); + } + + /** + * Internal class to store per-checkpoint metrics. + */ + private static class CheckpointMetrics { + private final long size; + private final int sstFileCount; + + CheckpointMetrics(long size, int sstFileCount) { + this.size = size; + this.sstFileCount = sstFileCount; + } + + public long getSize() { + return size; + } + + public int getSstFileCount() { + return sstFileCount; + } + } + + private OMSnapshotDirectoryMetrics(OMMetadataManager metadataManager) { + this.metadataManager = metadataManager; + } + + public static OMSnapshotDirectoryMetrics create(String parent, + OMMetadataManager metadataManager) { + MetricsSystem ms = DefaultMetricsSystem.instance(); + return ms.register(SOURCE_NAME, + parent, + new OMSnapshotDirectoryMetrics(metadataManager)); + } + + /** + * Updates all metrics (aggregate and per-checkpoint) asynchronously + * in a background thread. + */ + public void updateMetricsAsync() { + CompletableFuture<Void> currentUpdateFuture = currentUpdateFutureRef.get(); + if (currentUpdateFuture != null && !currentUpdateFuture.isDone()) { + return; + } + + CompletableFuture<Void> newFuture = CompletableFuture.runAsync(() -> { + try { + updateMetrics(); + lastUpdateTime.set(System.currentTimeMillis()); + } catch (Exception e) { + LOG.warn("Failed to update snapshot directory metrics", e); + } finally { + currentUpdateFutureRef.set(null); + } + }); + + currentUpdateFutureRef.set(newFuture); + } + + /** + * Updates all metrics synchronously - both aggregate and per-checkpoint-directory. + */ + @VisibleForTesting + void updateMetrics() throws IOException { + DBStore store = metadataManager.getStore(); + if (!(store instanceof RDBStore)) { + LOG.debug("Store is not RDBStore, skipping snapshot directory metrics update"); + resetMetrics(); + return; + } + + RDBStore rdbStore = (RDBStore) store; + String snapshotsParentDir = rdbStore.getSnapshotsParentDir(); + + if (snapshotsParentDir == null) { + resetMetrics(); + return; + } + + File snapshotsDir = new File(snapshotsParentDir); + if (!snapshotsDir.exists() || !snapshotsDir.isDirectory()) { + resetMetrics(); + return; + } + + try { + // Calculate aggregate metrics + long totalSize = FileUtils.sizeOfDirectory(snapshotsDir); + dbSnapshotsDirSize.set(totalSize); + + // Calculate per-checkpoint-directory metrics and aggregate totals + File[] checkpointDirs = snapshotsDir.listFiles(File::isDirectory); + int totalSstCount = 0; + int snapshotCount = 0; + Map<String, CheckpointMetrics> newCheckpointMetricsMap = new HashMap<>(); + + if (checkpointDirs != null) { + snapshotCount = checkpointDirs.length; + + for (File checkpointDir : checkpointDirs) { + if (!checkpointDir.isDirectory()) { + continue; + } + + String checkpointDirName = checkpointDir.getName(); + long checkpointSize = 0; + int sstFileCount = 0; + + try { + checkpointSize = FileUtils.sizeOfDirectory(checkpointDir); + File[] sstFiles = checkpointDir.listFiles((dir, name) -> + name.toLowerCase().endsWith(ROCKSDB_SST_SUFFIX)); + if (sstFiles != null) { + sstFileCount = sstFiles.length; + } + } catch (Exception e) { + LOG.debug("Error calculating metrics for checkpoint directory {}", + checkpointDirName, e); + // Continue with other directories even if one fails + continue; + } + + totalSstCount += sstFileCount; + newCheckpointMetricsMap.put(checkpointDirName, + new CheckpointMetrics(checkpointSize, sstFileCount)); + } + } + + // Update aggregate metrics + totalSstFilesCount.set(totalSstCount); + numSnapshots.set(snapshotCount); + + // Atomically update per-checkpoint metrics map + checkpointMetricsMap = newCheckpointMetricsMap; + + if (LOG.isDebugEnabled()) { + LOG.debug("Updated snapshot directory metrics: size={}, sstFiles={}, snapshots={}", + totalSize, totalSstCount, snapshotCount); + } + + } catch (Exception e) { + LOG.warn("Error calculating snapshot directory metrics", e); + resetMetrics(); + } + } + + /** + * Resets all metrics to zero. + */ + private void resetMetrics() { + dbSnapshotsDirSize.set(0); + totalSstFilesCount.set(0); + numSnapshots.set(0); + checkpointMetricsMap = new HashMap<>(); + } + + /** + * Implements MetricsSource to provide metrics. + * Reads from cached values updated by updateMetrics(). + */ + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + // Add aggregate metrics + collector.addRecord(SOURCE_NAME) + .setContext("Snapshot Directory Metrics") + .addGauge(SnapshotMetricsInfo.DbSnapshotsDirSize, dbSnapshotsDirSize.value()) + .addGauge(SnapshotMetricsInfo.TotalSstFilesCount, totalSstFilesCount.value()) + .addGauge(SnapshotMetricsInfo.NumSnapshots, numSnapshots.value()); + + // Add per-checkpoint-directory metrics from cached map + Map<String, CheckpointMetrics> currentMetrics = checkpointMetricsMap; + for (Map.Entry<String, CheckpointMetrics> entry : currentMetrics.entrySet()) { + String checkpointDirName = entry.getKey(); + CheckpointMetrics metrics = entry.getValue(); + + collector.addRecord(SOURCE_NAME) + .setContext("Per-Checkpoint Directory Metrics") + .tag(SnapshotMetricsInfo.CheckpointDirName, checkpointDirName) + .addGauge(SnapshotMetricsInfo.CheckpointDirSize, metrics.getSize()) + .addGauge(SnapshotMetricsInfo.CheckpointSstFilesCount, metrics.getSstFileCount()); + } + } + + @VisibleForTesting + public long getDbSnapshotsDirSize() { + return dbSnapshotsDirSize.value(); + } + + @VisibleForTesting + public long getTotalSstFilesCount() { + return totalSstFilesCount.value(); + } + + @VisibleForTesting + public long getNumSnapshots() { + return numSnapshots.value(); + } + + @VisibleForTesting + public long getLastUpdateTime() { + return lastUpdateTime.get(); + } + + @VisibleForTesting + public Map<String, CheckpointMetrics> getCheckpointMetricsMap() { + return Collections.unmodifiableMap(new HashMap<>(checkpointMetricsMap)); Review Comment: [nitpick] Creating a new HashMap copy in `getCheckpointMetricsMap()` for every call (even for test purposes) is inefficient. Since the map is already volatile and the `CheckpointMetrics` objects are immutable, you could return `Collections.unmodifiableMap(checkpointMetricsMap)` directly without the extra HashMap copy. The volatile guarantee ensures visibility of the reference. ```suggestion return Collections.unmodifiableMap(checkpointMetricsMap); ``` ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java: ########## @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.snapshot; + +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdds.annotation.InterfaceAudience; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Metrics for tracking db.snapshots directory space usage and SST file counts. + * Provides both aggregate metrics and per-checkpoint-directory metrics. + * Metrics are updated asynchronously to avoid blocking operations. + */ [email protected] +@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE) +public final class OMSnapshotDirectoryMetrics implements MetricsSource { + private static final Logger LOG = + LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class); + private static final String SOURCE_NAME = + OMSnapshotDirectoryMetrics.class.getSimpleName(); + + // Aggregate metrics + private @Metric MutableGaugeLong dbSnapshotsDirSize; + private @Metric MutableGaugeLong totalSstFilesCount; + private @Metric MutableGaugeLong numSnapshots; + + private final AtomicLong lastUpdateTime = new AtomicLong(0); + // Change the field declaration: + private final AtomicReference<CompletableFuture<Void>> currentUpdateFutureRef = + new AtomicReference<>(); + private final OMMetadataManager metadataManager; + private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME); Review Comment: The `registry` field is declared but never used in the class. It should either be removed or utilized for metrics registration if that was the intended design pattern. Currently, metrics are registered through annotations (@Metric) and the custom `getMetrics()` implementation. ```suggestion ``` ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java: ########## @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.snapshot; + +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdds.annotation.InterfaceAudience; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Metrics for tracking db.snapshots directory space usage and SST file counts. + * Provides both aggregate metrics and per-checkpoint-directory metrics. + * Metrics are updated asynchronously to avoid blocking operations. + */ [email protected] +@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE) +public final class OMSnapshotDirectoryMetrics implements MetricsSource { + private static final Logger LOG = + LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class); + private static final String SOURCE_NAME = + OMSnapshotDirectoryMetrics.class.getSimpleName(); + + // Aggregate metrics + private @Metric MutableGaugeLong dbSnapshotsDirSize; + private @Metric MutableGaugeLong totalSstFilesCount; + private @Metric MutableGaugeLong numSnapshots; + + private final AtomicLong lastUpdateTime = new AtomicLong(0); + // Change the field declaration: + private final AtomicReference<CompletableFuture<Void>> currentUpdateFutureRef = + new AtomicReference<>(); + private final OMMetadataManager metadataManager; + private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME); + + // Per-checkpoint-directory metrics storage + private volatile Map<String, CheckpointMetrics> checkpointMetricsMap = new HashMap<>(); + + private Timer updateTimer; + + // Add start method (after unRegister method) + /** + * Starts the periodic metrics update task. + * + * @param conf OzoneConfiguration for reading update interval + */ + public void start(OzoneConfiguration conf) { + long updateInterval = conf.getTimeDuration(OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL, + OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + + updateTimer = new Timer("OMSnapshotDirectoryMetricsUpdate", true); + updateTimer.schedule(new TimerTask() { + @Override + public void run() { + updateMetricsAsync(); + } + }, 0, updateInterval); + + // Do initial update + updateMetricsAsync(); + } + + /** + * Stops the periodic metrics update task. + */ + public void stop() { + if (updateTimer != null) { + updateTimer.cancel(); + updateTimer = null; + } + } + + public void unRegister() { + stop(); + MetricsSystem ms = DefaultMetricsSystem.instance(); + ms.unregisterSource(SOURCE_NAME); + } + + /** + * Internal class to store per-checkpoint metrics. + */ + private static class CheckpointMetrics { + private final long size; + private final int sstFileCount; + + CheckpointMetrics(long size, int sstFileCount) { + this.size = size; + this.sstFileCount = sstFileCount; + } + + public long getSize() { + return size; + } + + public int getSstFileCount() { + return sstFileCount; + } + } + + private OMSnapshotDirectoryMetrics(OMMetadataManager metadataManager) { + this.metadataManager = metadataManager; + } + + public static OMSnapshotDirectoryMetrics create(String parent, + OMMetadataManager metadataManager) { + MetricsSystem ms = DefaultMetricsSystem.instance(); + return ms.register(SOURCE_NAME, + parent, + new OMSnapshotDirectoryMetrics(metadataManager)); + } + + /** + * Updates all metrics (aggregate and per-checkpoint) asynchronously + * in a background thread. + */ + public void updateMetricsAsync() { + CompletableFuture<Void> currentUpdateFuture = currentUpdateFutureRef.get(); + if (currentUpdateFuture != null && !currentUpdateFuture.isDone()) { + return; + } + + CompletableFuture<Void> newFuture = CompletableFuture.runAsync(() -> { + try { + updateMetrics(); + lastUpdateTime.set(System.currentTimeMillis()); + } catch (Exception e) { + LOG.warn("Failed to update snapshot directory metrics", e); + } finally { + currentUpdateFutureRef.set(null); + } + }); + + currentUpdateFutureRef.set(newFuture); + } + + /** + * Updates all metrics synchronously - both aggregate and per-checkpoint-directory. + */ + @VisibleForTesting + void updateMetrics() throws IOException { + DBStore store = metadataManager.getStore(); + if (!(store instanceof RDBStore)) { + LOG.debug("Store is not RDBStore, skipping snapshot directory metrics update"); + resetMetrics(); + return; + } + + RDBStore rdbStore = (RDBStore) store; + String snapshotsParentDir = rdbStore.getSnapshotsParentDir(); + + if (snapshotsParentDir == null) { + resetMetrics(); + return; + } + + File snapshotsDir = new File(snapshotsParentDir); + if (!snapshotsDir.exists() || !snapshotsDir.isDirectory()) { + resetMetrics(); + return; + } + + try { + // Calculate aggregate metrics + long totalSize = FileUtils.sizeOfDirectory(snapshotsDir); + dbSnapshotsDirSize.set(totalSize); + + // Calculate per-checkpoint-directory metrics and aggregate totals + File[] checkpointDirs = snapshotsDir.listFiles(File::isDirectory); + int totalSstCount = 0; + int snapshotCount = 0; + Map<String, CheckpointMetrics> newCheckpointMetricsMap = new HashMap<>(); + + if (checkpointDirs != null) { + snapshotCount = checkpointDirs.length; + + for (File checkpointDir : checkpointDirs) { + if (!checkpointDir.isDirectory()) { + continue; + } + + String checkpointDirName = checkpointDir.getName(); + long checkpointSize = 0; + int sstFileCount = 0; + + try { + checkpointSize = FileUtils.sizeOfDirectory(checkpointDir); + File[] sstFiles = checkpointDir.listFiles((dir, name) -> + name.toLowerCase().endsWith(ROCKSDB_SST_SUFFIX)); + if (sstFiles != null) { + sstFileCount = sstFiles.length; + } + } catch (Exception e) { + LOG.debug("Error calculating metrics for checkpoint directory {}", + checkpointDirName, e); + // Continue with other directories even if one fails + continue; + } + + totalSstCount += sstFileCount; + newCheckpointMetricsMap.put(checkpointDirName, + new CheckpointMetrics(checkpointSize, sstFileCount)); Review Comment: [nitpick] The `FileUtils.sizeOfDirectory()` call can be expensive for large directory trees and may hold directory handles during traversal. Additionally, the `listFiles()` operations could potentially fail with unchecked exceptions in edge cases. While the code does catch generic exceptions at line 237 and continues, it would be more robust to add a timeout mechanism or limit the time spent on each directory to prevent blocking the metrics thread for too long. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
