This is an automated email from the ASF dual-hosted git repository.
siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new bf1d995b7f9 HDDS-13453. Add snapshot defrag metrics (#9788)
bf1d995b7f9 is described below
commit bf1d995b7f906bfe062bbf5f522d65aacb14f433
Author: Siyao Meng <[email protected]>
AuthorDate: Thu Feb 19 00:00:06 2026 -0800
HDDS-13453. Add snapshot defrag metrics (#9788)
Generated-by: GitHub Copilot with Claude Opus 4.6
---
.../hadoop/ozone/om/OMPerformanceMetrics.java | 14 ++
.../hadoop/ozone/om/OmSnapshotInternalMetrics.java | 94 +++++++++++++
.../om/snapshot/defrag/SnapshotDefragService.java | 82 +++++++++++-
.../snapshot/defrag/TestSnapshotDefragService.java | 149 +++++++++++++++++++++
4 files changed, 335 insertions(+), 4 deletions(-)
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMPerformanceMetrics.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMPerformanceMetrics.java
index 12827f523da..9c031e1a9fc 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMPerformanceMetrics.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMPerformanceMetrics.java
@@ -148,6 +148,12 @@ public class OMPerformanceMetrics {
@Metric(about = "Latency of each iteration of OpenKeyCleanupService in ms")
private MutableGaugeLong openKeyCleanupServiceLatencyMs;
+ @Metric(about = "Latency of the last snapshot full defragmentation operation
in ms")
+ private MutableGaugeLong snapshotDefragServiceFullLatencyMs;
+
+ @Metric(about = "Latency of the last snapshot incremental defragmentation
operation in ms")
+ private MutableGaugeLong snapshotDefragServiceIncLatencyMs;
+
@Metric(about = "ResolveBucketLink and ACL check latency for createKey in
nanoseconds")
private MutableRate createKeyResolveBucketAndAclCheckLatencyNs;
@@ -354,4 +360,12 @@ public void setKeyDeletingServiceLatencyMs(long
latencyInMs) {
public void setOpenKeyCleanupServiceLatencyMs(long latencyInMs) {
openKeyCleanupServiceLatencyMs.set(latencyInMs);
}
+
+ public void setSnapshotDefragServiceFullLatencyMs(long latencyInMs) {
+ snapshotDefragServiceFullLatencyMs.set(latencyInMs);
+ }
+
+ public void setSnapshotDefragServiceIncLatencyMs(long latencyInMs) {
+ snapshotDefragServiceIncLatencyMs.set(latencyInMs);
+ }
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotInternalMetrics.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotInternalMetrics.java
index ce5896c1b97..5dc74690fac 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotInternalMetrics.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotInternalMetrics.java
@@ -51,6 +51,28 @@ public class OmSnapshotInternalMetrics {
@Metric("Total no. of snapshot move table keys failures")
private MutableCounterLong numSnapshotMoveTableKeysFails;
+ /*
+ * Snapshot defragmentation metrics since last restart.
+ */
+ @Metric("Total no. of snapshot defragmentation operations (full and
incremental)")
+ private MutableCounterLong numSnapshotDefrag;
+ @Metric("Total no. of snapshot defragmentation failures (full and
incremental)")
+ private MutableCounterLong numSnapshotDefragFails;
+ @Metric("Total no. of snapshots skipped for defragmentation (non-active or
already defragmented)")
+ private MutableCounterLong numSnapshotDefragSnapshotSkipped;
+ @Metric("Total no. of full defragmentation operations")
+ private MutableCounterLong numSnapshotFullDefrag;
+ @Metric("Total no. of full defragmentation failures")
+ private MutableCounterLong numSnapshotFullDefragFails;
+ @Metric("Total no. of tables compacted during full defragmentation")
+ private MutableCounterLong numSnapshotFullDefragTablesCompacted;
+ @Metric("Total no. of incremental defragmentation operations")
+ private MutableCounterLong numSnapshotIncDefrag;
+ @Metric("Total no. of incremental defragmentation failures")
+ private MutableCounterLong numSnapshotIncDefragFails;
+ @Metric("Total no. of delta files processed during incremental
defragmentation")
+ private MutableCounterLong numSnapshotIncDefragDeltaFilesProcessed;
+
public OmSnapshotInternalMetrics() {
this.registry = new MetricsRegistry(METRICS_SOURCE_NAME);
}
@@ -115,4 +137,76 @@ public long getNumSnapshotSetPropertyFails() {
public long getNumSnapshotMoveTableKeysFails() {
return numSnapshotMoveTableKeysFails.value();
}
+
+ public void incNumSnapshotDefrag() {
+ numSnapshotDefrag.incr();
+ }
+
+ public void incNumSnapshotDefragFails() {
+ numSnapshotDefragFails.incr();
+ }
+
+ public void incNumSnapshotDefragSnapshotSkipped() {
+ numSnapshotDefragSnapshotSkipped.incr();
+ }
+
+ public void incNumSnapshotFullDefrag() {
+ numSnapshotFullDefrag.incr();
+ }
+
+ public void incNumSnapshotFullDefragFails() {
+ numSnapshotFullDefragFails.incr();
+ }
+
+ public void incNumSnapshotFullDefragTablesCompacted(long count) {
+ numSnapshotFullDefragTablesCompacted.incr(count);
+ }
+
+ public void incNumSnapshotIncDefrag() {
+ numSnapshotIncDefrag.incr();
+ }
+
+ public void incNumSnapshotIncDefragFails() {
+ numSnapshotIncDefragFails.incr();
+ }
+
+ public void incNumSnapshotIncDefragDeltaFilesProcessed(long count) {
+ numSnapshotIncDefragDeltaFilesProcessed.incr(count);
+ }
+
+ public long getNumSnapshotDefrag() {
+ return numSnapshotDefrag.value();
+ }
+
+ public long getNumSnapshotDefragFails() {
+ return numSnapshotDefragFails.value();
+ }
+
+ public long getNumSnapshotDefragSnapshotSkipped() {
+ return numSnapshotDefragSnapshotSkipped.value();
+ }
+
+ public long getNumSnapshotFullDefrag() {
+ return numSnapshotFullDefrag.value();
+ }
+
+ public long getNumSnapshotFullDefragFails() {
+ return numSnapshotFullDefragFails.value();
+ }
+
+ public long getNumSnapshotFullDefragTablesCompacted() {
+ return numSnapshotFullDefragTablesCompacted.value();
+ }
+
+ public long getNumSnapshotIncDefrag() {
+ return numSnapshotIncDefrag.value();
+ }
+
+ public long getNumSnapshotIncDefragFails() {
+ return numSnapshotIncDefragFails.value();
+ }
+
+ public long getNumSnapshotIncDefragDeltaFilesProcessed() {
+ return numSnapshotIncDefragDeltaFilesProcessed.value();
+ }
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java
index 87e0704d10a..cd3f845dcbe 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java
@@ -45,6 +45,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.BackgroundService;
@@ -69,8 +70,10 @@
import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions;
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
+import org.apache.hadoop.ozone.om.OMPerformanceMetrics;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotInternalMetrics;
import org.apache.hadoop.ozone.om.OmSnapshotLocalData;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.OzoneManager;
@@ -134,6 +137,8 @@ public class SnapshotDefragService extends BackgroundService
private final List<UUID> lockIds;
private final CompositeDeltaDiffComputer deltaDiffComputer;
private final Path differTmpDir;
+ private final OmSnapshotInternalMetrics snapshotMetrics;
+ private final OMPerformanceMetrics perfMetrics;
public SnapshotDefragService(long interval, TimeUnit unit, long
serviceTimeout,
OzoneManager ozoneManager, OzoneConfiguration configuration) throws
IOException {
@@ -166,6 +171,8 @@ public SnapshotDefragService(long interval, TimeUnit unit,
long serviceTimeout,
LOG.debug("Snapshot defragmentation diff status: {}", status);
}, false, !isRocksToolsNativeLibAvailable());
this.lockIds = new ArrayList<>(1);
+ this.snapshotMetrics = ozoneManager.getOmSnapshotIntMetrics();
+ this.perfMetrics = ozoneManager.getPerfMetrics();
}
@Override
@@ -266,6 +273,7 @@ private Pair<String, String> getTableBounds(Table<String,
?> table) throws Rocks
@VisibleForTesting
void performFullDefragmentation(DBStore checkpointDBStore, TablePrefixInfo
prefixInfo,
Set<String> incrementalTables) throws IOException {
+ long tablesCompacted = 0;
for (String table : incrementalTables) {
Table<String, CodecBuffer> checkpointTable =
checkpointDBStore.getTable(table, StringCodec.get(),
CodecBufferCodec.get(true));
@@ -295,7 +303,9 @@ void performFullDefragmentation(DBStore checkpointDBStore,
TablePrefixInfo prefi
compactRangeOptions.setChangeLevel(true);
checkpointDBStore.compactTable(table, compactRangeOptions);
}
+ tablesCompacted++;
}
+ snapshotMetrics.incNumSnapshotFullDefragTablesCompacted(tablesCompacted);
}
/**
@@ -382,6 +392,7 @@ void performIncrementalDefragmentation(SnapshotInfo
previousSnapshotInfo, Snapsh
// Map of delta files grouped on the basis of the tableName.
Collection<Pair<Path, SstFileInfo>> allTableDeltaFiles =
this.deltaDiffComputer.getDeltaFiles(
previousSnapshotInfo, snapshotInfo, incrementalTables);
+
snapshotMetrics.incNumSnapshotIncDefragDeltaFilesProcessed(allTableDeltaFiles.size());
Map<String, List<Path>> tableGroupedDeltaFiles =
allTableDeltaFiles.stream()
.collect(Collectors.groupingBy(pair ->
pair.getValue().getColumnFamily(),
@@ -574,6 +585,38 @@ OmMetadataManagerImpl createCheckpoint(SnapshotInfo
snapshotInfo,
}
}
+ /**
+ * Logs disk space usage and SST file count for the given directory.
+ *
+ * @param label a descriptive label for the log message (e.g., "Before
defrag", "After defrag")
+ * @param snapshotInfo the snapshot being defragmented
+ * @param dir the directory to inspect
+ */
+ private void logDirStats(String label, SnapshotInfo snapshotInfo, Path dir) {
+ try (Stream<Path> files = Files.list(dir)) {
+ long totalSize = 0;
+ long sstFileCount = 0;
+ long totalFileCount = 0;
+ java.util.Iterator<Path> it = files.iterator();
+ while (it.hasNext()) {
+ Path file = it.next();
+ if (Files.isRegularFile(file)) {
+ long fileSize = Files.size(file);
+ totalSize += fileSize;
+ totalFileCount++;
+ if (file.toString().endsWith(SST_FILE_EXTENSION)) {
+ sstFileCount++;
+ }
+ }
+ }
+ LOG.trace("{} snapshot {} (ID: {}): dir={}, totalFiles={}, sstFiles={},
diskUsage={} bytes",
+ label, snapshotInfo.getTableKey(), snapshotInfo.getSnapshotId(),
+ dir, totalFileCount, sstFileCount, totalSize);
+ } catch (IOException e) {
+ LOG.trace("Failed to collect directory stats for {}: {}", dir,
e.getMessage());
+ }
+ }
+
private void acquireContentLock(UUID snapshotID) throws IOException {
lockIds.clear();
lockIds.add(snapshotID);
@@ -591,10 +634,12 @@ boolean checkAndDefragSnapshot(SnapshotChainManager
chainManager, UUID snapshotI
if (snapshotInfo.getSnapshotStatus() !=
SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE) {
LOG.debug("Skipping defragmentation for non-active snapshot: {} (ID:
{})",
snapshotInfo.getName(), snapshotInfo.getSnapshotId());
+ snapshotMetrics.incNumSnapshotDefragSnapshotSkipped();
return false;
}
Pair<Boolean, Integer> needsDefragVersionPair =
needsDefragmentation(snapshotInfo);
if (!needsDefragVersionPair.getLeft()) {
+ snapshotMetrics.incNumSnapshotDefragSnapshotSkipped();
return false;
}
LOG.info("Defragmenting snapshot: {} (ID: {})",
snapshotInfo.getTableKey(), snapshotInfo.getSnapshotId());
@@ -608,18 +653,37 @@ boolean checkAndDefragSnapshot(SnapshotChainManager
chainManager, UUID snapshotI
Path checkpointLocation =
checkpointMetadataManager.getStore().getDbLocation().toPath();
try {
DBStore checkpointDBStore = checkpointMetadataManager.getStore();
+ if (LOG.isTraceEnabled()) {
+ Path snapshotDbDir = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getMetadataManager(), snapshotId,
needsDefragVersionPair.getValue());
+ logDirStats("Before defrag", snapshotInfo, snapshotDbDir);
+ }
TablePrefixInfo prefixInfo =
ozoneManager.getMetadataManager().getTableBucketPrefix(snapshotInfo.getVolumeName(),
snapshotInfo.getBucketName());
// If first snapshot in the chain perform full defragmentation.
- if (snapshotInfo.getPathPreviousSnapshotId() == null) {
+ boolean isFullDefrag = snapshotInfo.getPathPreviousSnapshotId() == null;
+ long defragStart = Time.monotonicNow();
+ if (isFullDefrag) {
LOG.info("Performing full defragmentation for snapshot: {} (ID: {})",
snapshotInfo.getTableKey(),
snapshotInfo.getSnapshotId());
- performFullDefragmentation(checkpointDBStore, prefixInfo,
COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT);
+ try {
+ performFullDefragmentation(checkpointDBStore, prefixInfo,
COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT);
+
perfMetrics.setSnapshotDefragServiceFullLatencyMs(Time.monotonicNow() -
defragStart);
+ } catch (IOException e) {
+ snapshotMetrics.incNumSnapshotFullDefragFails();
+ throw e;
+ }
} else {
LOG.info("Performing incremental defragmentation for snapshot: {} (ID:
{})", snapshotInfo.getTableKey(),
snapshotInfo.getSnapshotId());
- performIncrementalDefragmentation(checkpointSnapshotInfo,
snapshotInfo, needsDefragVersionPair.getValue(),
- checkpointDBStore, prefixInfo,
COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT);
+ try {
+ performIncrementalDefragmentation(checkpointSnapshotInfo,
snapshotInfo, needsDefragVersionPair.getValue(),
+ checkpointDBStore, prefixInfo,
COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT);
+ perfMetrics.setSnapshotDefragServiceIncLatencyMs(Time.monotonicNow()
- defragStart);
+ } catch (IOException e) {
+ snapshotMetrics.incNumSnapshotIncDefragFails();
+ throw e;
+ }
}
int previousVersion;
// Acquire Content lock on the snapshot to ensure the contents of the
table doesn't get changed.
@@ -628,6 +692,9 @@ boolean checkAndDefragSnapshot(SnapshotChainManager
chainManager, UUID snapshotI
// Ingestion of incremental tables KeyTable/FileTable/DirectoryTable
done now we need to just reingest the
// remaining tables from the original snapshot.
ingestNonIncrementalTables(checkpointDBStore, snapshotInfo,
prefixInfo, COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT);
+ if (LOG.isTraceEnabled()) {
+ logDirStats("After defrag", snapshotInfo, checkpointLocation);
+ }
checkpointMetadataManager.close();
checkpointMetadataManager = null;
// Switch the snapshot DB location to the new version.
@@ -638,6 +705,12 @@ boolean checkAndDefragSnapshot(SnapshotChainManager
chainManager, UUID snapshotI
}
LOG.info("Completed defragmentation for snapshot: {} (ID: {}) in {} ms",
snapshotInfo.getTableKey(),
snapshotInfo.getSnapshotId(), Time.monotonicNow() - start);
+ snapshotMetrics.incNumSnapshotDefrag();
+ if (isFullDefrag) {
+ snapshotMetrics.incNumSnapshotFullDefrag();
+ } else {
+ snapshotMetrics.incNumSnapshotIncDefrag();
+ }
} finally {
if (checkpointMetadataManager != null) {
checkpointMetadataManager.close();
@@ -684,6 +757,7 @@ public synchronized boolean triggerSnapshotDefragOnce()
throws IOException {
snapshotsDefraggedCount.getAndIncrement();
}
} catch (IOException e) {
+ snapshotMetrics.incNumSnapshotDefragFails();
LOG.error("Exception while defragmenting snapshot: {}", snapshotId, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestSnapshotDefragService.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestSnapshotDefragService.java
index 00c1b73398a..224de13840a 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestSnapshotDefragService.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestSnapshotDefragService.java
@@ -24,6 +24,7 @@
import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.FILE_TABLE;
import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE;
import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.VOLUME_TABLE;
+import static
org.apache.hadoop.ozone.om.lock.DAGLeveledResource.BOOTSTRAP_LOCK;
import static
org.apache.hadoop.ozone.om.lock.DAGLeveledResource.SNAPSHOT_DB_CONTENT_LOCK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -81,6 +82,7 @@
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.InMemoryTestTable;
+import org.apache.hadoop.hdds.utils.db.ManagedRawSSTFileReader;
import org.apache.hadoop.hdds.utils.db.RDBSstFileWriter;
import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint;
import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
@@ -90,8 +92,10 @@
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMPerformanceMetrics;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotInternalMetrics;
import org.apache.hadoop.ozone.om.OmSnapshotLocalData;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.OzoneManager;
@@ -150,6 +154,12 @@ public class TestSnapshotDefragService {
private DeltaFileComputer deltaFileComputer;
+ @Mock
+ private OmSnapshotInternalMetrics snapshotMetrics;
+
+ @Mock
+ private OMPerformanceMetrics perfMetrics;
+
@TempDir
private Path tempDir;
private OzoneConfiguration configuration;
@@ -170,6 +180,8 @@ public void setup() throws IOException {
when(ozoneManager.isRunning()).thenReturn(true);
when(ozoneManager.getVersionManager()).thenReturn(versionManager);
when(ozoneManager.getOmRatisServer()).thenReturn(mock(OzoneManagerRatisServer.class));
+ when(ozoneManager.getOmSnapshotIntMetrics()).thenReturn(snapshotMetrics);
+ when(ozoneManager.getPerfMetrics()).thenReturn(perfMetrics);
when(omSnapshotManager.getSnapshotLocalDataManager()).thenReturn(snapshotLocalDataManager);
when(metadataManager.getLock()).thenReturn(omLock);
@@ -340,6 +352,7 @@ public void testPerformFullDefragmentation() throws
Exception {
assertEquals(dummyTableValues, nonCompactedTable.getValue().getMap());
}
}
+ verify(snapshotMetrics).incNumSnapshotFullDefragTablesCompacted(2L);
}
@Test
@@ -810,6 +823,7 @@ public String next() {
}
}
}
+ verify(snapshotMetrics).incNumSnapshotIncDefragDeltaFilesProcessed(3L);
}
}
@@ -822,9 +836,135 @@ public void testCheckAndDefragDeletedSnapshot() throws
IOException {
mockedStatic.when(() -> SnapshotUtils.getSnapshotInfo(eq(ozoneManager),
eq(chainManager),
eq(snapshotInfo.getSnapshotId()))).thenReturn(snapshotInfo);
assertFalse(defragService.checkAndDefragSnapshot(chainManager,
snapshotInfo.getSnapshotId()));
+ verify(snapshotMetrics).incNumSnapshotDefragSnapshotSkipped();
+ }
+ }
+
+ @Test
+ public void testCheckAndDefragAlreadyDefraggedSnapshot() throws IOException {
+ SnapshotInfo snapshotInfo = createMockSnapshotInfo(UUID.randomUUID(),
"vol1", "bucket1", "snap1");
+ SnapshotChainManager chainManager = mock(SnapshotChainManager.class);
+ SnapshotDefragService spyDefragService = Mockito.spy(defragService);
+ try (MockedStatic<SnapshotUtils> mockedStatic =
Mockito.mockStatic(SnapshotUtils.class)) {
+ mockedStatic.when(() -> SnapshotUtils.getSnapshotInfo(eq(ozoneManager),
eq(chainManager),
+ eq(snapshotInfo.getSnapshotId()))).thenReturn(snapshotInfo);
+ doReturn(Pair.of(false,
0)).when(spyDefragService).needsDefragmentation(eq(snapshotInfo));
+ assertFalse(spyDefragService.checkAndDefragSnapshot(chainManager,
snapshotInfo.getSnapshotId()));
+ verify(snapshotMetrics).incNumSnapshotDefragSnapshotSkipped();
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testCheckAndDefragSnapshotFailure(boolean
previousSnapshotExists) throws IOException {
+ // Test metrics numSnapshotFullDefragFails and numSnapshotIncDefragFails
+
+ SnapshotInfo snapshotInfo = createMockSnapshotInfo(UUID.randomUUID(),
"vol1", "bucket1", "snap2");
+ SnapshotInfo previousSnapshotInfo;
+ if (previousSnapshotExists) {
+ previousSnapshotInfo = createMockSnapshotInfo(UUID.randomUUID(), "vol1",
"bucket1", "snap1");
+
snapshotInfo.setPathPreviousSnapshotId(previousSnapshotInfo.getSnapshotId());
+ } else {
+ previousSnapshotInfo = null;
+ }
+
+ SnapshotChainManager chainManager = mock(SnapshotChainManager.class);
+ try (MockedStatic<SnapshotUtils> mockedStatic =
Mockito.mockStatic(SnapshotUtils.class)) {
+ mockedStatic.when(() -> SnapshotUtils.getSnapshotInfo(eq(ozoneManager),
eq(chainManager),
+ eq(snapshotInfo.getSnapshotId()))).thenReturn(snapshotInfo);
+ if (previousSnapshotExists) {
+ mockedStatic.when(() ->
SnapshotUtils.getSnapshotInfo(eq(ozoneManager), eq(chainManager),
+
eq(previousSnapshotInfo.getSnapshotId()))).thenReturn(previousSnapshotInfo);
+ }
+ SnapshotDefragService spyDefragService = Mockito.spy(defragService);
+ doReturn(Pair.of(true,
10)).when(spyDefragService).needsDefragmentation(eq(snapshotInfo));
+ OmMetadataManagerImpl checkpointMetadataManager =
mock(OmMetadataManagerImpl.class);
+ File checkpointPath =
tempDir.resolve("checkpoint").toAbsolutePath().toFile();
+ DBStore checkpointDBStore = mock(DBStore.class);
+ SnapshotInfo checkpointSnapshotInfo = previousSnapshotExists ?
previousSnapshotInfo : snapshotInfo;
+ when(checkpointMetadataManager.getStore()).thenReturn(checkpointDBStore);
+ when(checkpointDBStore.getDbLocation()).thenReturn(checkpointPath);
+
doReturn(checkpointMetadataManager).when(spyDefragService).createCheckpoint(eq(checkpointSnapshotInfo),
+ eq(COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT));
+ TablePrefixInfo prefixInfo = new TablePrefixInfo(Collections.emptyMap());
+
when(metadataManager.getTableBucketPrefix(eq(snapshotInfo.getVolumeName()),
eq(snapshotInfo.getBucketName())))
+ .thenReturn(prefixInfo);
+
+ // Make the defrag operation throw IOException
+ IOException defragException = new IOException("Defrag failed");
+ if (previousSnapshotExists) {
+
Mockito.doThrow(defragException).when(spyDefragService).performIncrementalDefragmentation(
+ eq(previousSnapshotInfo), eq(snapshotInfo), eq(10),
eq(checkpointDBStore), eq(prefixInfo),
+ eq(COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT));
+ } else {
+
Mockito.doThrow(defragException).when(spyDefragService).performFullDefragmentation(
+ eq(checkpointDBStore), eq(prefixInfo),
eq(COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT));
+ }
+
+ IOException thrown =
org.junit.jupiter.api.Assertions.assertThrows(IOException.class,
+ () -> spyDefragService.checkAndDefragSnapshot(chainManager,
snapshotInfo.getSnapshotId()));
+ assertEquals(defragException, thrown);
+
+ // Verify specific failure metric was incremented
+ if (previousSnapshotExists) {
+ verify(snapshotMetrics).incNumSnapshotIncDefragFails();
+ } else {
+ verify(snapshotMetrics).incNumSnapshotFullDefragFails();
+ }
+ // Verify success metrics were NOT incremented
+ verify(snapshotMetrics, Mockito.never()).incNumSnapshotDefrag();
+ verify(snapshotMetrics, Mockito.never()).incNumSnapshotFullDefrag();
+ verify(snapshotMetrics, Mockito.never()).incNumSnapshotIncDefrag();
+ // Verify checkpoint was closed in finally block
+ verify(checkpointMetadataManager).close();
+ }
+ }
+
+ @Test
+ public void testTriggerSnapshotDefragOnceFailure() throws IOException,
InterruptedException {
+ // Test metric numSnapshotDefragFails
+
+ UUID snapshotId = UUID.randomUUID();
+ SnapshotDefragService spyDefragService = Mockito.spy(defragService);
+ SnapshotChainManager chainManager = mock(SnapshotChainManager.class);
+ when(metadataManager.getSnapshotChainManager()).thenReturn(chainManager);
+ when(chainManager.iterator(false)).thenReturn(
+ Collections.singletonList(snapshotId).iterator());
+
+ // Enable the service to run
+ spyDefragService.resume();
+
+ // Mock bootstrap lock to return a no-op closeable
+ when(omLock.acquireLock(eq(BOOTSTRAP_LOCK), anyString(), eq(true)))
+ .thenReturn(new UncheckedAutoCloseableSupplier<OMLockDetails>() {
+ @Override
+ public OMLockDetails get() {
+ return OMLockDetails.EMPTY_DETAILS_LOCK_ACQUIRED;
+ }
+
+ @Override
+ public void close() {
+ }
+ });
+
+ // Stub checkAndDefragSnapshot to throw IOException
+ Mockito.doThrow(new IOException("Defrag failed")).when(spyDefragService)
+ .checkAndDefragSnapshot(eq(chainManager), eq(snapshotId));
+
+ try (MockedStatic<ManagedRawSSTFileReader> mockedNativeLib =
+ Mockito.mockStatic(ManagedRawSSTFileReader.class)) {
+
mockedNativeLib.when(ManagedRawSSTFileReader::tryLoadLibrary).thenReturn(true);
+
+ // triggerSnapshotDefragOnce catches the IOException internally
+ assertTrue(spyDefragService.triggerSnapshotDefragOnce());
+ }
+
+ // Verify the general failure metric was incremented in the outer catch
+ verify(snapshotMetrics).incNumSnapshotDefragFails();
+ // Verify the snapshot was not counted as defragged
+ assertEquals(0, spyDefragService.getSnapshotsDefraggedCount().get());
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCheckAndDefragActiveSnapshot(boolean previousSnapshotExists)
throws IOException {
@@ -921,6 +1061,15 @@ public void testCheckAndDefragActiveSnapshot(boolean
previousSnapshotExists) thr
verifier.verify(spyDefragService).atomicSwitchSnapshotDB(eq(snapshotInfo.getSnapshotId()),
eq(checkpointPath.toPath()));
verifier.verify(omSnapshotManager).deleteSnapshotCheckpointDirectories(eq(snapshotInfo.getSnapshotId()),
eq(20));
+ // Verify metrics
+ verify(snapshotMetrics).incNumSnapshotDefrag();
+ if (previousSnapshotExists) {
+ verify(snapshotMetrics).incNumSnapshotIncDefrag();
+
verify(perfMetrics).setSnapshotDefragServiceIncLatencyMs(Mockito.anyLong());
+ } else {
+ verify(snapshotMetrics).incNumSnapshotFullDefrag();
+
verify(perfMetrics).setSnapshotDefragServiceFullLatencyMs(Mockito.anyLong());
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]