This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d96442bf0 HDDS-13125. Add metrics for monitoring the SST file pruning 
threads. (#8764)
9d96442bf0 is described below

commit 9d96442bf0d5b784bccbf8da80079ac0c18aef97
Author: SaketaChalamchala <saketa.chalamch...@gmail.com>
AuthorDate: Tue Jul 15 12:20:00 2025 -0700

    HDDS-13125. Add metrics for monitoring the SST file pruning threads. (#8764)
    
    Co-authored-by: saketa <schalamch...@cloudera.com>
---
 hadoop-hdds/rocksdb-checkpoint-differ/pom.xml      |   2 +-
 .../ozone/rocksdiff/RocksDBCheckpointDiffer.java   |  30 ++++-
 .../ozone/rocksdiff/SSTFilePruningMetrics.java     | 135 +++++++++++++++++++++
 .../rocksdiff/TestRocksDBCheckpointDiffer.java     |  15 ++-
 4 files changed, 178 insertions(+), 4 deletions(-)

diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml 
b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
index fd8aae95c1..df94ff35af 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
@@ -84,7 +84,7 @@
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
-      <scope>test</scope>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.ozone</groupId>
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index 583a3bb7bc..45cd4d6a79 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -172,6 +172,7 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
       = new BootstrapStateHandler.Lock();
   private static final int SST_READ_AHEAD_SIZE = 2 * 1024 * 1024;
   private int pruneSSTFileBatchSize;
+  private SSTFilePruningMetrics sstFilePruningMetrics;
   private ColumnFamilyHandle snapshotInfoTableCFHandle;
   private static final String DAG_PRUNING_SERVICE_NAME = 
"CompactionDagPruningService";
   private AtomicBoolean suspended;
@@ -239,10 +240,11 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
     this.pruneSSTFileBatchSize = configuration.getInt(
         OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_BACKUP_BATCH_SIZE,
         OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_BACKUP_BATCH_SIZE_DEFAULT);
+    this.sstFilePruningMetrics = 
SSTFilePruningMetrics.create(activeDBLocationName);
     try {
       if (configuration.getBoolean(OZONE_OM_SNAPSHOT_LOAD_NATIVE_LIB, 
OZONE_OM_SNAPSHOT_LOAD_NATIVE_LIB_DEFAULT)
           && ManagedRawSSTFileReader.loadLibrary()) {
-        pruneQueue = new ConcurrentLinkedQueue<>();
+        this.pruneQueue = new ConcurrentLinkedQueue<>();
       }
     } catch (NativeLibraryNotLoadedException e) {
       LOG.warn("Native Library for raw sst file reading loading failed." +
@@ -338,6 +340,9 @@ public void close() {
             LOG.info("Shutting down {}.", DAG_PRUNING_SERVICE_NAME);
             scheduler.close();
           }
+          if (sstFilePruningMetrics != null) {
+            sstFilePruningMetrics.unRegister();
+          }
         }
       }
     }
@@ -532,6 +537,7 @@ public void onCompactionCompleted(RocksDB db,
         // so that the backup input sst files can be pruned.
         if (pruneQueue != null) {
           pruneQueue.offer(key);
+          sstFilePruningMetrics.updateQueueSize(pruneQueue.size());
         }
       }
     };
@@ -751,6 +757,10 @@ private void loadCompactionDagFromDB() {
       }
     } catch (InvalidProtocolBufferException e) {
       throw new RuntimeException(e);
+    } finally {
+      if (pruneQueue != null) {
+        sstFilePruningMetrics.updateQueueSize(pruneQueue.size());
+      }
     }
   }
 
@@ -1258,13 +1268,16 @@ public void pruneSstFileValues() {
     if (!shouldRun()) {
       return;
     }
+    long batchStartTime = System.nanoTime();
+    int filesPrunedInBatch = 0;
+    int filesSkippedInBatch = 0;
+    int batchCounter = 0;
 
     Path sstBackupDirPath = Paths.get(sstBackupDir);
     Path prunedSSTFilePath = sstBackupDirPath.resolve(PRUNED_SST_FILE_TEMP);
     try (ManagedOptions managedOptions = new ManagedOptions();
          ManagedEnvOptions envOptions = new ManagedEnvOptions()) {
       byte[] compactionLogEntryKey;
-      int batchCounter = 0;
       while ((compactionLogEntryKey = pruneQueue.peek()) != null && 
++batchCounter <= pruneSSTFileBatchSize) {
         CompactionLogEntry compactionLogEntry;
         // Get the compaction log entry.
@@ -1289,6 +1302,7 @@ public void pruneSstFileValues() {
             if (Files.notExists(sstFilePath)) {
               LOG.debug("Skipping pruning SST file {} as it does not exist in 
backup directory.", sstFilePath);
               updatedFileInfoList.add(fileInfo);
+              filesSkippedInBatch++;
               continue;
             }
 
@@ -1306,6 +1320,7 @@ public void pruneSstFileValues() {
             fileInfo.setPruned();
             updatedFileInfoList.add(fileInfo);
             LOG.debug("Completed pruning OMKeyInfo from {}", sstFilePath);
+            filesPrunedInBatch++;
           }
 
           // Update compaction log entry in table.
@@ -1325,6 +1340,12 @@ public void pruneSstFileValues() {
       }
     } catch (IOException | InterruptedException e) {
       LOG.error("Could not prune source OMKeyInfo from backup SST files.", e);
+      sstFilePruningMetrics.incrPruningFailures();
+    } finally {
+      LOG.info("Completed pruning OMKeyInfo from backup SST files in {}ms.",
+          (System.nanoTime() - batchStartTime) / 1_000_000);
+      sstFilePruningMetrics.updateBatchLevelMetrics(filesPrunedInBatch, 
filesSkippedInBatch,
+          batchCounter, pruneQueue.size());
     }
   }
 
@@ -1428,4 +1449,9 @@ private Map<String, CompactionFileInfo> 
toFileInfoList(List<String> sstFiles, Ro
   ConcurrentMap<String, CompactionFileInfo> getInflightCompactions() {
     return inflightCompactions;
   }
+
+  @VisibleForTesting
+  public SSTFilePruningMetrics getPruningMetrics() {
+    return sstFilePruningMetrics;
+  }
 }
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/SSTFilePruningMetrics.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/SSTFilePruningMetrics.java
new file mode 100644
index 0000000000..46eb2c0889
--- /dev/null
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/SSTFilePruningMetrics.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ozone.rocksdiff;
+
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+
+/**
+ * Class contains metrics for monitoring SST file pruning operations in 
RocksDBCheckpointDiffer.
+ */
+@Metrics(about = "SST File Pruning Metrics", context = OzoneConsts.OZONE)
+public final class SSTFilePruningMetrics implements MetricsSource {
+
+  private static final String METRICS_SOURCE_NAME_PREFIX = 
SSTFilePruningMetrics.class.getSimpleName();
+  private final String metricSourceName;
+  private final MetricsRegistry registry;
+
+  /*
+   * Pruning Throughput Metrics.
+   */
+  @Metric("Total no. of SST files pruned")
+  private MutableCounterLong filesPrunedTotal;
+  @Metric("No. of SST files pruned in the last batch")
+  private MutableGaugeLong filesPrunedLast;
+  @Metric("Total no. of SST files removed")
+  private MutableCounterLong filesSkippedTotal;
+  @Metric("Total no. of compactions processed")
+  private MutableCounterLong compactionsProcessed;
+  @Metric("No. of pending pruning jobs in queue")
+  private MutableGaugeLong pruneQueueSize;
+
+  /*
+   * Pruning failure Metrics.
+   */
+  @Metric("No. of pruning job failures")
+  private MutableCounterLong pruningFailures;
+
+  private SSTFilePruningMetrics(String sourceName) {
+    this.metricSourceName = sourceName;
+    this.registry = new MetricsRegistry(metricSourceName);
+  }
+
+  /**
+   * Creates and returns SSTFilePruningMetrics instance.
+   *
+   * @return SSTFilePruningMetrics
+   */
+  public static SSTFilePruningMetrics create(String dbLocation) {
+    String sourceName = METRICS_SOURCE_NAME_PREFIX +
+        (dbLocation == null || dbLocation.isEmpty() ? "" : "-" + 
dbLocation.replaceAll("[/\\:\\s]", "_"));
+    return DefaultMetricsSystem.instance().register(sourceName, "SST File 
Pruning Metrics",
+        new SSTFilePruningMetrics(sourceName));
+  }
+
+  /**
+   * Unregister the metrics instance.
+   */
+  public void unRegister() {
+    DefaultMetricsSystem.instance().unregisterSource(metricSourceName);
+  }
+
+  public void updateQueueSize(long queueSize) {
+    pruneQueueSize.set(queueSize);
+  }
+
+  public void updateBatchLevelMetrics(long filesPruned, long filesSkipped, int 
compactions, long queueSize) {
+    filesPrunedTotal.incr(filesPruned);
+    filesPrunedLast.set(filesPruned);
+    filesSkippedTotal.incr(filesSkipped);
+    compactionsProcessed.incr(compactions);
+    updateQueueSize(queueSize);
+  }
+
+  public void incrPruningFailures() {
+    pruningFailures.incr();
+  }
+
+  public long getFilesPrunedTotal() {
+    return filesPrunedTotal.value();
+  }
+
+  public long getFilesPrunedLast() {
+    return filesPrunedLast.value();
+  }
+
+  public long getFilesRemovedTotal() {
+    return filesSkippedTotal.value();
+  }
+
+  public long getCompactionsProcessed() {
+    return compactionsProcessed.value();
+  }
+
+  public long getPruneQueueSize() {
+    return pruneQueueSize.value();
+  }
+
+  public long getPruningFailures() {
+    return pruningFailures.value();
+  }
+
+  @Override
+  public void getMetrics(MetricsCollector collector, boolean all) {
+    MetricsRecordBuilder recordBuilder = collector.addRecord(metricSourceName);
+    filesPrunedTotal.snapshot(recordBuilder, all);
+    filesPrunedLast.snapshot(recordBuilder, all);
+    filesSkippedTotal.snapshot(recordBuilder, all);
+    compactionsProcessed.snapshot(recordBuilder, all);
+    pruneQueueSize.snapshot(recordBuilder, all);
+    pruningFailures.snapshot(recordBuilder, all);
+  }
+}
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
index 1a329647cc..5b6fc39f23 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
@@ -1935,6 +1935,12 @@ private static Stream<Arguments> 
casesGetSSTDiffListWithoutDB2() {
    */
   @Test
   public void testPruneSSTFileValues() throws Exception {
+    SSTFilePruningMetrics sstFilePruningMetrics = 
rocksDBCheckpointDiffer.getPruningMetrics();
+    assertEquals(0L, sstFilePruningMetrics.getPruneQueueSize());
+    assertEquals(0L, sstFilePruningMetrics.getFilesPrunedTotal());
+    assertEquals(0L, sstFilePruningMetrics.getFilesPrunedLast());
+    assertEquals(0L, sstFilePruningMetrics.getCompactionsProcessed());
+    assertEquals(0L, sstFilePruningMetrics.getFilesRemovedTotal());
 
     List<Pair<byte[], Integer>> keys = new ArrayList<Pair<byte[], Integer>>();
     keys.add(Pair.of("key1".getBytes(UTF_8), Integer.valueOf(1)));
@@ -1962,8 +1968,9 @@ public void testPruneSSTFileValues() throws Exception {
     );
     byte[] compactionLogEntryKey = 
rocksDBCheckpointDiffer.addToCompactionLogTable(compactionLogEntry);
     rocksDBCheckpointDiffer.loadAllCompactionLogs();
+    assertEquals(1L, sstFilePruningMetrics.getPruneQueueSize());
 
-    // Pruning should not fail a source SST file has been removed by a another 
pruner.
+    // Pruning should not fail a source SST file has been removed by another 
pruner.
     Files.delete(sstBackUpDir.toPath().resolve(inputFile73 + 
SST_FILE_EXTENSION));
     // Run the SST file pruner.
     ManagedRawSSTFileIterator mockedRawSSTFileItr = 
mock(ManagedRawSSTFileIterator.class);
@@ -2011,6 +2018,12 @@ public void testPruneSSTFileValues() throws Exception {
 
     // Verify 000073.sst pruning has been skipped
     assertFalse(fileInfo73.isPruned());
+
+    assertEquals(0L, sstFilePruningMetrics.getPruneQueueSize());
+    assertEquals(1L, sstFilePruningMetrics.getFilesPrunedTotal());
+    assertEquals(1L, sstFilePruningMetrics.getFilesPrunedLast());
+    assertEquals(1L, sstFilePruningMetrics.getCompactionsProcessed());
+    assertEquals(1L, sstFilePruningMetrics.getFilesRemovedTotal());
   }
 
   private void createSSTFileWithKeys(String filePath, List<Pair<byte[], 
Integer>> keys)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@ozone.apache.org
For additional commands, e-mail: commits-h...@ozone.apache.org

Reply via email to