This is an automated email from the ASF dual-hosted git repository.
siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 96ce0b647e0 HDDS-13780. Skeleton of background snapshot defrag service
(#9133)
96ce0b647e0 is described below
commit 96ce0b647e02736f0eb25d3e0d9afd82271ce021
Author: Siyao Meng <[email protected]>
AuthorDate: Tue Oct 14 13:06:10 2025 -0700
HDDS-13780. Skeleton of background snapshot defrag service (#9133)
---
.../org/apache/hadoop/ozone/OzoneConfigKeys.java | 9 +
.../java/org/apache/hadoop/ozone/OzoneConsts.java | 1 +
.../common/src/main/resources/ozone-default.xml | 22 ++
.../hadoop/hdds/utils/db/RDBSstFileWriter.java | 15 +-
.../org/apache/hadoop/ozone/om/OMConfigKeys.java | 11 +
.../org/apache/hadoop/ozone/om/KeyManager.java | 6 +
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 73 ++++-
.../hadoop/ozone/om/OmSnapshotLocalDataYaml.java | 9 +-
.../hadoop/ozone/om/SnapshotDefragService.java | 363 +++++++++++++++++++++
9 files changed, 501 insertions(+), 8 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 1d47fb72958..db66fed22fe 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -277,6 +277,15 @@ public final class OzoneConfigKeys {
OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT_DEFAULT = "300s";
// 300s for default
+ public static final String OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT =
+ "ozone.snapshot.defrag.service.timeout";
+ public static final String
+ OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT_DEFAULT = "300s";
+ // TODO: Adjust timeout as needed.
+ // One concern would be that snapdiff can take a long time.
+ // If snapdiff wait time is included in the timeout it can make it
indeterministic.
+ // -- So don't wait? Trigger and check later?
+
public static final String OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL =
"ozone.snapshot.deleting.service.interval";
public static final String
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index c9064da1781..cb4490c2c1d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -520,6 +520,7 @@ public final class OzoneConsts {
public static final String OM_SNAPSHOT_DIR = "db.snapshots";
public static final String OM_SNAPSHOT_CHECKPOINT_DIR = OM_SNAPSHOT_DIR
+ OM_KEY_PREFIX + "checkpointState";
+ public static final String OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR =
"checkpointStateDefragged";
public static final String OM_SNAPSHOT_DIFF_DIR = OM_SNAPSHOT_DIR
+ OM_KEY_PREFIX + "diffState";
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 02f1e71bfcf..b200c0b5bf1 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3758,6 +3758,14 @@
Snapshot Deleting Service per run.
</description>
</property>
+ <property>
+ <name>ozone.snapshot.defrag.limit.per.task</name>
+ <value>1</value>
+ <tag>OZONE, PERFORMANCE, OM</tag>
+ <description>The maximum number of snapshots that would be defragmented in
+ each task run of snapshot defragmentation service.
+ </description>
+ </property>
<property>
<name>ozone.snapshot.filtering.service.interval</name>
<value>1m</value>
@@ -3765,6 +3773,13 @@
<description>Time interval of the SST File filtering service from Snapshot.
</description>
</property>
+ <property>
+ <name>ozone.snapshot.defrag.service.interval</name>
+ <value>-1</value>
+ <tag>OZONE, PERFORMANCE, OM</tag>
+ <description>Task interval of snapshot defragmentation service.
+ </description>
+ </property>
<property>
<name>ozone.om.snapshot.checkpoint.dir.creation.poll.timeout</name>
<value>20s</value>
@@ -3781,6 +3796,13 @@
<description>A timeout value of sst filtering service.
</description>
</property>
+ <property>
+ <name>ozone.snapshot.defrag.service.timeout</name>
+ <value>300s</value>
+ <tag>OZONE, PERFORMANCE,OM</tag>
+ <description>Timeout value of a run of snapshot defragmentation service.
+ </description>
+ </property>
<property>
<name>ozone.filesystem.snapshot.enabled</name>
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
index e84854cae44..5aa561ba948 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
@@ -28,7 +28,7 @@
/**
* DumpFileWriter using rocksdb sst files.
*/
-class RDBSstFileWriter implements Closeable {
+public class RDBSstFileWriter implements Closeable {
private ManagedSstFileWriter sstFileWriter;
private File sstFile;
@@ -36,7 +36,7 @@ class RDBSstFileWriter implements Closeable {
private ManagedOptions emptyOption = new ManagedOptions();
private final ManagedEnvOptions emptyEnvOptions = new ManagedEnvOptions();
- RDBSstFileWriter(File externalFile) throws RocksDatabaseException {
+ public RDBSstFileWriter(File externalFile) throws RocksDatabaseException {
this.sstFileWriter = new ManagedSstFileWriter(emptyEnvOptions,
emptyOption);
this.keyCounter = new AtomicLong(0);
this.sstFile = externalFile;
@@ -60,6 +60,17 @@ public void put(byte[] key, byte[] value) throws
RocksDatabaseException {
}
}
+ public void delete(byte[] key) throws RocksDatabaseException {
+ try {
+ sstFileWriter.delete(key);
+ keyCounter.incrementAndGet();
+ } catch (RocksDBException e) {
+ closeOnFailure();
+ throw new RocksDatabaseException("Failed to delete key (length=" +
key.length
+ + "), sstFile=" + sstFile.getAbsolutePath(), e);
+ }
+ }
+
@Override
public void close() throws RocksDatabaseException {
if (sstFileWriter != null) {
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 87bc9cb3017..969288ed92c 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -428,11 +428,22 @@ public final class OMConfigKeys {
"ozone.snapshot.deleting.limit.per.task";
public static final int SNAPSHOT_DELETING_LIMIT_PER_TASK_DEFAULT = 10;
+ // Snapshot defragmentation service configuration
+ public static final String SNAPSHOT_DEFRAG_LIMIT_PER_TASK =
+ "ozone.snapshot.defrag.limit.per.task";
+ public static final int SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT = 1;
+
public static final String OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL =
"ozone.snapshot.filtering.service.interval";
public static final String
OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT = "60s";
+ public static final String OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL =
+ "ozone.snapshot.defrag.service.interval";
+ public static final String
+ OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT = "-1";
+ // TODO: Disabled by default. Do not enable by default until upgrade
handling is complete.
+
public static final String
OZONE_SNAPSHOT_CHECKPOINT_DIR_CREATION_POLL_TIMEOUT =
"ozone.om.snapshot.checkpoint.dir.creation.poll.timeout";
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index 7e76885c49b..872a99e94b1 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -346,6 +346,12 @@ DeleteKeysResult getPendingDeletionSubFiles(long volumeId,
*/
SstFilteringService getSnapshotSstFilteringService();
+ /**
+ * Returns the instance of Snapshot Defrag service.
+ * @return Background service.
+ */
+ SnapshotDefragService getSnapshotDefragService();
+
/**
* Returns the instance of Snapshot Deleting service.
* @return Background service.
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 02949d5ee74..e458fa73236 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -30,6 +30,8 @@
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT_DEFAULT;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL_DEFAULT;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_TIMEOUT;
@@ -58,6 +60,8 @@
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT_DEFAULT;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED_DEFAULT;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
@@ -202,6 +206,7 @@ public class KeyManagerImpl implements KeyManager {
private KeyDeletingService keyDeletingService;
private SstFilteringService snapshotSstFilteringService;
+ private SnapshotDefragService snapshotDefragService;
private SnapshotDeletingService snapshotDeletingService;
private final KeyProviderCryptoExtension kmsProvider;
@@ -310,6 +315,11 @@ public void start(OzoneConfiguration configuration) {
startSnapshotSstFilteringService(configuration);
}
+ if (snapshotDefragService == null &&
+ ozoneManager.isFilesystemSnapshotEnabled()) {
+ startSnapshotDefragService(configuration);
+ }
+
if (snapshotDeletingService == null &&
ozoneManager.isFilesystemSnapshotEnabled()) {
@@ -393,6 +403,42 @@ public void stopSnapshotSstFilteringService() {
}
}
+ /**
+ * Start the snapshot defrag service if interval is not set to disabled
value.
+ * @param conf
+ */
+ public void startSnapshotDefragService(OzoneConfiguration conf) {
+ if (isDefragSvcEnabled()) {
+ long serviceInterval = conf.getTimeDuration(
+ OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL,
+ OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ long serviceTimeout = conf.getTimeDuration(
+ OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT,
+ OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ snapshotDefragService =
+ new SnapshotDefragService(serviceInterval, TimeUnit.MILLISECONDS,
+ serviceTimeout, ozoneManager, conf);
+ snapshotDefragService.start();
+ } else {
+ LOG.info("SnapshotDefragService is disabled. Snapshot defragmentation
will not run periodically.");
+ }
+ }
+
+ /**
+ * Stop the snapshot defrag service if it is running.
+ */
+ public void stopSnapshotDefragService() {
+ if (snapshotDefragService != null) {
+ snapshotDefragService.shutdown();
+ snapshotDefragService = null;
+ } else {
+ LOG.info("SnapshotDefragService is already stopped or not started.");
+ }
+ }
+
private void startCompactionService(OzoneConfiguration configuration,
boolean isCompactionServiceEnabled) {
if (compactionService == null && isCompactionServiceEnabled) {
@@ -419,7 +465,7 @@ KeyProviderCryptoExtension getKMSProvider() {
}
@Override
- public void stop() throws IOException {
+ public void stop() {
if (keyDeletingService != null) {
keyDeletingService.shutdown();
keyDeletingService = null;
@@ -436,6 +482,10 @@ public void stop() throws IOException {
snapshotSstFilteringService.shutdown();
snapshotSstFilteringService = null;
}
+ if (snapshotDefragService != null) {
+ snapshotDefragService.shutdown();
+ snapshotDefragService = null;
+ }
if (snapshotDeletingService != null) {
snapshotDeletingService.shutdown();
snapshotDeletingService = null;
@@ -450,6 +500,16 @@ public void stop() throws IOException {
}
}
+ /**
+ * Get the SnapshotDefragService instance.
+ *
+ * @return SnapshotDefragService instance, or null if not initialized
+ */
+ @Override
+ public SnapshotDefragService getSnapshotDefragService() {
+ return snapshotDefragService;
+ }
+
private OmBucketInfo getBucketInfo(String volumeName, String bucketName)
throws IOException {
String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
@@ -973,7 +1033,16 @@ public boolean isSstFilteringSvcEnabled() {
// any interval <= 0 causes IllegalArgumentException from
scheduleWithFixedDelay
return serviceInterval > 0;
}
-
+
+ public boolean isDefragSvcEnabled() {
+ long serviceInterval = ozoneManager.getConfiguration()
+ .getTimeDuration(OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL,
+ OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ // any interval <= 0 causes IllegalArgumentException from
scheduleWithFixedDelay
+ return serviceInterval > 0;
+ }
+
@Override
public OmMultipartUploadList listMultipartUploads(String volumeName,
String bucketName,
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java
index 543c4c6397c..c376e9a332c 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java
@@ -164,10 +164,11 @@ private final class ConstructSnapshotLocalData extends
AbstractConstruct {
public Object construct(Node node) {
MappingNode mnode = (MappingNode) node;
Map<Object, Object> nodes = constructMapping(mnode);
- UUID snapId = UUID.fromString((String)
nodes.get(OzoneConsts.OM_SLD_SNAP_ID));
- UUID prevSnapId = UUID.fromString((String)
nodes.get(OzoneConsts.OM_SLD_PREV_SNAP_ID));
- OmSnapshotLocalData snapshotLocalData = new
OmSnapshotLocalData(snapId, Collections.emptyList(),
- prevSnapId);
+ final String snapIdStr = (String)
nodes.get(OzoneConsts.OM_SLD_SNAP_ID);
+ UUID snapId = UUID.fromString(snapIdStr);
+ final String prevSnapIdStr = (String)
nodes.get(OzoneConsts.OM_SLD_PREV_SNAP_ID);
+ UUID prevSnapId = prevSnapIdStr != null ?
UUID.fromString(prevSnapIdStr) : null;
+ OmSnapshotLocalData snapshotLocalData = new
OmSnapshotLocalData(snapId, Collections.emptyList(), prevSnapId);
// Set version from YAML
Integer version = (Integer) nodes.get(OzoneConsts.OM_SLD_VERSION);
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java
new file mode 100644
index 00000000000..436593b861b
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT;
+import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_GC_LOCK;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader;
+import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
+import org.apache.hadoop.ozone.om.snapshot.MultiSnapshotLocks;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Background service for defragmenting snapshots in the active snapshot chain.
+ * When snapshots are taken, they capture the entire OM RocksDB state but may
contain
+ * fragmented data. This service defragments snapshots by creating new
compacted
+ * RocksDB instances with only the necessary data for tracked column families.
+ * <p>
+ * The service processes snapshots in the active chain sequentially, starting
with
+ * the first non-defragmented snapshot. For the first snapshot in the chain, it
+ * performs a full defragmentation by copying all keys. For subsequent
snapshots,
+ * it uses incremental defragmentation based on diffs from the previous
defragmented
+ * snapshot.
+ */
+public class SnapshotDefragService extends BackgroundService
+ implements BootstrapStateHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SnapshotDefragService.class);
+
+ // Use only a single thread for snapshot defragmentation to avoid conflicts
+ private static final int DEFRAG_CORE_POOL_SIZE = 1;
+
+ private final OzoneManager ozoneManager;
+ private final AtomicLong runCount = new AtomicLong(0);
+
+ // Number of snapshots to be processed in a single iteration
+ private final long snapshotLimitPerTask;
+
+ private final AtomicLong snapshotsDefraggedCount;
+ private final AtomicBoolean running;
+
+ private final MultiSnapshotLocks snapshotIdLocks;
+
+ private final BootstrapStateHandler.Lock lock = new
BootstrapStateHandler.Lock();
+
+ public SnapshotDefragService(long interval, TimeUnit unit, long
serviceTimeout,
+ OzoneManager ozoneManager, OzoneConfiguration configuration) {
+ super("SnapshotDefragService", interval, unit, DEFRAG_CORE_POOL_SIZE,
+ serviceTimeout, ozoneManager.getThreadNamePrefix());
+ this.ozoneManager = ozoneManager;
+ this.snapshotLimitPerTask = configuration
+ .getLong(SNAPSHOT_DEFRAG_LIMIT_PER_TASK,
+ SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT);
+ snapshotsDefraggedCount = new AtomicLong(0);
+ running = new AtomicBoolean(false);
+ IOzoneManagerLock omLock = ozoneManager.getMetadataManager().getLock();
+ this.snapshotIdLocks = new MultiSnapshotLocks(omLock, SNAPSHOT_GC_LOCK,
true);
+ }
+
+ @Override
+ public void start() {
+ running.set(true);
+ super.start();
+ }
+
+ @VisibleForTesting
+ public void pause() {
+ running.set(false);
+ }
+
+ @VisibleForTesting
+ public void resume() {
+ running.set(true);
+ }
+
+ /**
+ * Checks if rocks-tools native library is available.
+ */
+ private boolean isRocksToolsNativeLibAvailable() {
+ try {
+ return ManagedRawSSTFileReader.tryLoadLibrary();
+ } catch (Exception e) {
+ LOG.warn("Failed to check native code availability", e);
+ return false;
+ }
+ }
+
+ /**
+ * Checks if a snapshot needs defragmentation by examining its YAML metadata.
+ */
+ private boolean needsDefragmentation(SnapshotInfo snapshotInfo) {
+ String snapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), snapshotInfo);
+
+ try {
+ // Read snapshot local metadata from YAML
+ OmSnapshotLocalData snapshotLocalData =
ozoneManager.getOmSnapshotManager()
+ .getSnapshotLocalDataManager()
+ .getOmSnapshotLocalData(snapshotInfo);
+
+ // Check if snapshot needs compaction (defragmentation)
+ boolean needsDefrag = snapshotLocalData.getNeedsDefrag();
+ LOG.debug("Snapshot {} needsDefragmentation field value: {}",
+ snapshotInfo.getName(), needsDefrag);
+
+ return needsDefrag;
+ } catch (IOException e) {
+ LOG.warn("Failed to read YAML metadata for snapshot {}, assuming defrag
needed",
+ snapshotInfo.getName(), e);
+ return true;
+ }
+ }
+
+ /**
+ * Performs full defragmentation for the first snapshot in the chain.
+ * This is a simplified implementation that demonstrates the concept.
+ */
+ private void performFullDefragmentation(SnapshotInfo snapshotInfo,
+ OmSnapshot omSnapshot) throws IOException {
+
+ // TODO: Implement full defragmentation
+ }
+
+ /**
+ * Performs incremental defragmentation using diff from previous
defragmented snapshot.
+ */
+ private void performIncrementalDefragmentation(SnapshotInfo currentSnapshot,
+ SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot)
+ throws IOException {
+
+ // TODO: Implement incremental defragmentation
+ }
+
+ private final class SnapshotDefragTask implements BackgroundTask {
+
+ @Override
+ public BackgroundTaskResult call() throws Exception {
+ // Check OM leader and readiness
+ if (shouldRun()) {
+ final long count = runCount.incrementAndGet();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initiating Snapshot Defragmentation Task: run # {}",
count);
+ }
+ triggerSnapshotDefragOnce();
+ }
+
+ return EmptyTaskResult.newResult();
+ }
+ }
+
+ public synchronized boolean triggerSnapshotDefragOnce() throws IOException {
+ // Check if rocks-tools native lib is available
+ if (!isRocksToolsNativeLibAvailable()) {
+ LOG.warn("Rocks-tools native library is not available. " +
+ "Stopping SnapshotDefragService.");
+ return false;
+ }
+
+ Optional<OmSnapshotManager> snapshotManager =
Optional.ofNullable(ozoneManager)
+ .map(OzoneManager::getOmSnapshotManager);
+ if (!snapshotManager.isPresent()) {
+ LOG.debug("OmSnapshotManager not available, skipping defragmentation
task");
+ return false;
+ }
+
+ // Get the SnapshotChainManager to iterate through the global snapshot
chain
+ final SnapshotChainManager snapshotChainManager =
+ ((OmMetadataManagerImpl)
ozoneManager.getMetadataManager()).getSnapshotChainManager();
+
+ final Table<String, SnapshotInfo> snapshotInfoTable =
+ ozoneManager.getMetadataManager().getSnapshotInfoTable();
+
+ // Use iterator(false) to iterate forward through the snapshot chain
+ Iterator<UUID> snapshotIterator = snapshotChainManager.iterator(false);
+
+ long snapshotLimit = snapshotLimitPerTask;
+
+ while (snapshotLimit > 0 && running.get() && snapshotIterator.hasNext()) {
+ // Get SnapshotInfo for the current snapshot in the chain
+ UUID snapshotId = snapshotIterator.next();
+ String snapshotTableKey = snapshotChainManager.getTableKey(snapshotId);
+ SnapshotInfo snapshotToDefrag = snapshotInfoTable.get(snapshotTableKey);
+ if (snapshotToDefrag == null) {
+ LOG.warn("Snapshot with ID '{}' not found in snapshot info table",
snapshotId);
+ continue;
+ }
+
+ // Skip deleted snapshots
+ if (snapshotToDefrag.getSnapshotStatus() ==
SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED) {
+ LOG.debug("Skipping deleted snapshot: {} (ID: {})",
+ snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
+ continue;
+ }
+
+ // Check if this snapshot needs defragmentation
+ if (!needsDefragmentation(snapshotToDefrag)) {
+ LOG.debug("Skipping already defragged snapshot: {} (ID: {})",
+ snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
+ continue;
+ }
+
+ LOG.info("Will defrag snapshot: {} (ID: {})",
+ snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
+
+ // Acquire MultiSnapshotLocks
+ if
(!snapshotIdLocks.acquireLock(Collections.singletonList(snapshotToDefrag.getSnapshotId()))
+ .isLockAcquired()) {
+ LOG.error("Abort. Failed to acquire lock on snapshot: {} (ID: {})",
+ snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
+ break;
+ }
+
+ try {
+ LOG.info("Processing snapshot defragmentation for: {} (ID: {})",
+ snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
+
+ // Get snapshot through SnapshotCache for proper locking
+ try (UncheckedAutoCloseableSupplier<OmSnapshot> snapshotSupplier =
+
snapshotManager.get().getSnapshot(snapshotToDefrag.getSnapshotId())) {
+
+ OmSnapshot omSnapshot = snapshotSupplier.get();
+
+ UUID pathPreviousSnapshotId =
snapshotToDefrag.getPathPreviousSnapshotId();
+ boolean isFirstSnapshotInPath = pathPreviousSnapshotId == null;
+ if (isFirstSnapshotInPath) {
+ LOG.info("Performing full defragmentation for first snapshot (in
path): {}",
+ snapshotToDefrag.getName());
+ performFullDefragmentation(snapshotToDefrag, omSnapshot);
+ } else {
+ final String psIdtableKey =
snapshotChainManager.getTableKey(pathPreviousSnapshotId);
+ SnapshotInfo previousDefraggedSnapshot =
snapshotInfoTable.get(psIdtableKey);
+
+ LOG.info("Performing incremental defragmentation for snapshot: {}
" +
+ "based on previous defragmented snapshot: {}",
+ snapshotToDefrag.getName(),
previousDefraggedSnapshot.getName());
+
+ // If previous path snapshot is not null, it must have been
defragmented already
+ // Sanity check to ensure previous snapshot exists and is
defragmented
+ if (needsDefragmentation(previousDefraggedSnapshot)) {
+ LOG.error("Fatal error before defragging snapshot: {}. " +
+ "Previous snapshot in path {} was not defragged while it
is expected to be.",
+ snapshotToDefrag.getName(),
previousDefraggedSnapshot.getName());
+ break;
+ }
+
+ performIncrementalDefragmentation(snapshotToDefrag,
+ previousDefraggedSnapshot, omSnapshot);
+ }
+
+ // TODO: Update snapshot metadata here?
+
+ // Close and evict the original snapshot DB from SnapshotCache
+ // TODO: Implement proper eviction from SnapshotCache
+ LOG.info("Defragmentation completed for snapshot: {}",
+ snapshotToDefrag.getName());
+
+ snapshotLimit--;
+ snapshotsDefraggedCount.getAndIncrement();
+
+ } catch (OMException ome) {
+ if (ome.getResult() == OMException.ResultCodes.FILE_NOT_FOUND) {
+ LOG.info("Snapshot {} was deleted during defragmentation",
+ snapshotToDefrag.getName());
+ } else {
+ LOG.error("OMException during snapshot defragmentation for: {}",
+ snapshotToDefrag.getName(), ome);
+ }
+ }
+
+ } catch (Exception e) {
+ LOG.error("Exception during snapshot defragmentation for: {}",
+ snapshotToDefrag.getName(), e);
+ return false;
+ } finally {
+ // Release lock MultiSnapshotLocks
+ snapshotIdLocks.releaseLock();
+ LOG.debug("Released MultiSnapshotLocks on snapshot: {} (ID: {})",
+ snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
+
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public BackgroundTaskQueue getTasks() {
+ BackgroundTaskQueue queue = new BackgroundTaskQueue();
+ // TODO: Can be parallelized for different buckets
+ queue.add(new SnapshotDefragTask());
+ return queue;
+ }
+
+ /**
+ * Returns true if the service run conditions are satisfied, false otherwise.
+ */
+ private boolean shouldRun() {
+ if (ozoneManager == null) {
+ // OzoneManager can be null for testing
+ return true;
+ }
+ if (ozoneManager.getOmRatisServer() == null) {
+ LOG.warn("OzoneManagerRatisServer is not initialized yet");
+ return false;
+ }
+ // The service only runs if current OM node is ready
+ return running.get() && ozoneManager.isRunning();
+ }
+
+ public AtomicLong getSnapshotsDefraggedCount() {
+ return snapshotsDefraggedCount;
+ }
+
+ @Override
+ public BootstrapStateHandler.Lock getBootstrapStateLock() {
+ return lock;
+ }
+
+ @Override
+ public void shutdown() {
+ running.set(false);
+ super.shutdown();
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]