This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 96ce0b647e0 HDDS-13780. Skeleton of background snapshot defrag service 
(#9133)
96ce0b647e0 is described below

commit 96ce0b647e02736f0eb25d3e0d9afd82271ce021
Author: Siyao Meng <[email protected]>
AuthorDate: Tue Oct 14 13:06:10 2025 -0700

    HDDS-13780. Skeleton of background snapshot defrag service (#9133)
---
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   9 +
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   1 +
 .../common/src/main/resources/ozone-default.xml    |  22 ++
 .../hadoop/hdds/utils/db/RDBSstFileWriter.java     |  15 +-
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |  11 +
 .../org/apache/hadoop/ozone/om/KeyManager.java     |   6 +
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  73 ++++-
 .../hadoop/ozone/om/OmSnapshotLocalDataYaml.java   |   9 +-
 .../hadoop/ozone/om/SnapshotDefragService.java     | 363 +++++++++++++++++++++
 9 files changed, 501 insertions(+), 8 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 1d47fb72958..db66fed22fe 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -277,6 +277,15 @@ public final class OzoneConfigKeys {
       OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT_DEFAULT = "300s";
       // 300s for default
 
+  public static final String OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT =
+      "ozone.snapshot.defrag.service.timeout";
+  public static final String
+      OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT_DEFAULT = "300s";
+  // TODO: Adjust timeout as needed.
+  //  One concern would be that snapdiff can take a long time.
+  //  If snapdiff wait time is included in the timeout it can make it 
indeterministic.
+  //  -- So don't wait? Trigger and check later?
+
   public static final String OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL =
       "ozone.snapshot.deleting.service.interval";
   public static final String
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index c9064da1781..cb4490c2c1d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -520,6 +520,7 @@ public final class OzoneConsts {
   public static final String OM_SNAPSHOT_DIR = "db.snapshots";
   public static final String OM_SNAPSHOT_CHECKPOINT_DIR = OM_SNAPSHOT_DIR
       + OM_KEY_PREFIX + "checkpointState";
+  public static final String OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR = 
"checkpointStateDefragged";
   public static final String OM_SNAPSHOT_DIFF_DIR = OM_SNAPSHOT_DIR
       + OM_KEY_PREFIX + "diffState";
 
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 02f1e71bfcf..b200c0b5bf1 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3758,6 +3758,14 @@
       Snapshot Deleting Service per run.
     </description>
   </property>
+  <property>
+    <name>ozone.snapshot.defrag.limit.per.task</name>
+    <value>1</value>
+    <tag>OZONE, PERFORMANCE, OM</tag>
+    <description>The maximum number of snapshots that would be defragmented in
+      each task run of snapshot defragmentation service.
+    </description>
+  </property>
   <property>
     <name>ozone.snapshot.filtering.service.interval</name>
     <value>1m</value>
@@ -3765,6 +3773,13 @@
     <description>Time interval of the SST File filtering service from Snapshot.
     </description>
   </property>
+  <property>
+    <name>ozone.snapshot.defrag.service.interval</name>
+    <value>-1</value>
+    <tag>OZONE, PERFORMANCE, OM</tag>
+    <description>Task interval of snapshot defragmentation service.
+    </description>
+  </property>
   <property>
     <name>ozone.om.snapshot.checkpoint.dir.creation.poll.timeout</name>
     <value>20s</value>
@@ -3781,6 +3796,13 @@
     <description>A timeout value of sst filtering service.
     </description>
   </property>
+  <property>
+    <name>ozone.snapshot.defrag.service.timeout</name>
+    <value>300s</value>
+    <tag>OZONE, PERFORMANCE,OM</tag>
+    <description>Timeout value of a run of snapshot defragmentation service.
+    </description>
+  </property>
 
   <property>
     <name>ozone.filesystem.snapshot.enabled</name>
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
index e84854cae44..5aa561ba948 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
@@ -28,7 +28,7 @@
 /**
  * DumpFileWriter using rocksdb sst files.
  */
-class RDBSstFileWriter implements Closeable {
+public class RDBSstFileWriter implements Closeable {
 
   private ManagedSstFileWriter sstFileWriter;
   private File sstFile;
@@ -36,7 +36,7 @@ class RDBSstFileWriter implements Closeable {
   private ManagedOptions emptyOption = new ManagedOptions();
   private final ManagedEnvOptions emptyEnvOptions = new ManagedEnvOptions();
 
-  RDBSstFileWriter(File externalFile) throws RocksDatabaseException {
+  public RDBSstFileWriter(File externalFile) throws RocksDatabaseException {
     this.sstFileWriter = new ManagedSstFileWriter(emptyEnvOptions, 
emptyOption);
     this.keyCounter = new AtomicLong(0);
     this.sstFile = externalFile;
@@ -60,6 +60,17 @@ public void put(byte[] key, byte[] value) throws 
RocksDatabaseException {
     }
   }
 
+  public void delete(byte[] key) throws RocksDatabaseException {
+    try {
+      sstFileWriter.delete(key);
+      keyCounter.incrementAndGet();
+    } catch (RocksDBException e) {
+      closeOnFailure();
+      throw new RocksDatabaseException("Failed to delete key (length=" + 
key.length
+          + "), sstFile=" + sstFile.getAbsolutePath(), e);
+    }
+  }
+
   @Override
   public void close() throws RocksDatabaseException {
     if (sstFileWriter != null) {
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 87bc9cb3017..969288ed92c 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -428,11 +428,22 @@ public final class OMConfigKeys {
       "ozone.snapshot.deleting.limit.per.task";
   public static final int SNAPSHOT_DELETING_LIMIT_PER_TASK_DEFAULT = 10;
 
+  // Snapshot defragmentation service configuration
+  public static final String SNAPSHOT_DEFRAG_LIMIT_PER_TASK =
+      "ozone.snapshot.defrag.limit.per.task";
+  public static final int SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT = 1;
+
   public static final String OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL =
       "ozone.snapshot.filtering.service.interval";
   public static final String
       OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT = "60s";
 
+  public static final String OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL =
+      "ozone.snapshot.defrag.service.interval";
+  public static final String
+      OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT = "-1";
+  // TODO: Disabled by default. Do not enable by default until upgrade 
handling is complete.
+
   public static final String
       OZONE_SNAPSHOT_CHECKPOINT_DIR_CREATION_POLL_TIMEOUT =
       "ozone.om.snapshot.checkpoint.dir.creation.poll.timeout";
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index 7e76885c49b..872a99e94b1 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -346,6 +346,12 @@ DeleteKeysResult getPendingDeletionSubFiles(long volumeId,
    */
   SstFilteringService getSnapshotSstFilteringService();
 
+  /**
+   * Returns the instance of Snapshot Defrag service.
+   * @return Background service.
+   */
+  SnapshotDefragService getSnapshotDefragService();
+
   /**
    * Returns the instance of Snapshot Deleting service.
    * @return Background service.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 02949d5ee74..e458fa73236 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -30,6 +30,8 @@
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT_DEFAULT;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL_DEFAULT;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_TIMEOUT;
@@ -58,6 +60,8 @@
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
@@ -202,6 +206,7 @@ public class KeyManagerImpl implements KeyManager {
   private KeyDeletingService keyDeletingService;
 
   private SstFilteringService snapshotSstFilteringService;
+  private SnapshotDefragService snapshotDefragService;
   private SnapshotDeletingService snapshotDeletingService;
 
   private final KeyProviderCryptoExtension kmsProvider;
@@ -310,6 +315,11 @@ public void start(OzoneConfiguration configuration) {
       startSnapshotSstFilteringService(configuration);
     }
 
+    if (snapshotDefragService == null &&
+        ozoneManager.isFilesystemSnapshotEnabled()) {
+      startSnapshotDefragService(configuration);
+    }
+
     if (snapshotDeletingService == null &&
         ozoneManager.isFilesystemSnapshotEnabled()) {
 
@@ -393,6 +403,42 @@ public void stopSnapshotSstFilteringService() {
     }
   }
 
+  /**
+   * Start the snapshot defrag service if interval is not set to disabled 
value.
+   * @param conf
+   */
+  public void startSnapshotDefragService(OzoneConfiguration conf) {
+    if (isDefragSvcEnabled()) {
+      long serviceInterval = conf.getTimeDuration(
+          OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL,
+          OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT,
+          TimeUnit.MILLISECONDS);
+      long serviceTimeout = conf.getTimeDuration(
+          OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT,
+          OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT_DEFAULT,
+          TimeUnit.MILLISECONDS);
+
+      snapshotDefragService =
+          new SnapshotDefragService(serviceInterval, TimeUnit.MILLISECONDS,
+              serviceTimeout, ozoneManager, conf);
+      snapshotDefragService.start();
+    } else {
+      LOG.info("SnapshotDefragService is disabled. Snapshot defragmentation 
will not run periodically.");
+    }
+  }
+
+  /**
+   * Stop the snapshot defrag service if it is running.
+   */
+  public void stopSnapshotDefragService() {
+    if (snapshotDefragService != null) {
+      snapshotDefragService.shutdown();
+      snapshotDefragService = null;
+    } else {
+      LOG.info("SnapshotDefragService is already stopped or not started.");
+    }
+  }
+
   private void startCompactionService(OzoneConfiguration configuration,
                                       boolean isCompactionServiceEnabled) {
     if (compactionService == null && isCompactionServiceEnabled) {
@@ -419,7 +465,7 @@ KeyProviderCryptoExtension getKMSProvider() {
   }
 
   @Override
-  public void stop() throws IOException {
+  public void stop() {
     if (keyDeletingService != null) {
       keyDeletingService.shutdown();
       keyDeletingService = null;
@@ -436,6 +482,10 @@ public void stop() throws IOException {
       snapshotSstFilteringService.shutdown();
       snapshotSstFilteringService = null;
     }
+    if (snapshotDefragService != null) {
+      snapshotDefragService.shutdown();
+      snapshotDefragService = null;
+    }
     if (snapshotDeletingService != null) {
       snapshotDeletingService.shutdown();
       snapshotDeletingService = null;
@@ -450,6 +500,16 @@ public void stop() throws IOException {
     }
   }
 
+  /**
+   * Get the SnapshotDefragService instance.
+   *
+   * @return SnapshotDefragService instance, or null if not initialized
+   */
+  @Override
+  public SnapshotDefragService getSnapshotDefragService() {
+    return snapshotDefragService;
+  }
+
   private OmBucketInfo getBucketInfo(String volumeName, String bucketName)
       throws IOException {
     String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
@@ -973,7 +1033,16 @@ public boolean isSstFilteringSvcEnabled() {
     // any interval <= 0 causes IllegalArgumentException from 
scheduleWithFixedDelay
     return serviceInterval > 0;
   }
-  
+
+  public boolean isDefragSvcEnabled() {
+    long serviceInterval = ozoneManager.getConfiguration()
+        .getTimeDuration(OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL,
+            OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT,
+            TimeUnit.MILLISECONDS);
+    // any interval <= 0 causes IllegalArgumentException from 
scheduleWithFixedDelay
+    return serviceInterval > 0;
+  }
+
   @Override
   public OmMultipartUploadList listMultipartUploads(String volumeName,
       String bucketName,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java
index 543c4c6397c..c376e9a332c 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java
@@ -164,10 +164,11 @@ private final class ConstructSnapshotLocalData extends 
AbstractConstruct {
       public Object construct(Node node) {
         MappingNode mnode = (MappingNode) node;
         Map<Object, Object> nodes = constructMapping(mnode);
-        UUID snapId = UUID.fromString((String) 
nodes.get(OzoneConsts.OM_SLD_SNAP_ID));
-        UUID prevSnapId = UUID.fromString((String) 
nodes.get(OzoneConsts.OM_SLD_PREV_SNAP_ID));
-        OmSnapshotLocalData snapshotLocalData = new 
OmSnapshotLocalData(snapId, Collections.emptyList(),
-            prevSnapId);
+        final String snapIdStr = (String) 
nodes.get(OzoneConsts.OM_SLD_SNAP_ID);
+        UUID snapId = UUID.fromString(snapIdStr);
+        final String prevSnapIdStr = (String) 
nodes.get(OzoneConsts.OM_SLD_PREV_SNAP_ID);
+        UUID prevSnapId = prevSnapIdStr != null ? 
UUID.fromString(prevSnapIdStr) : null;
+        OmSnapshotLocalData snapshotLocalData = new 
OmSnapshotLocalData(snapId, Collections.emptyList(), prevSnapId);
 
         // Set version from YAML
         Integer version = (Integer) nodes.get(OzoneConsts.OM_SLD_VERSION);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java
new file mode 100644
index 00000000000..436593b861b
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_GC_LOCK;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader;
+import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
+import org.apache.hadoop.ozone.om.snapshot.MultiSnapshotLocks;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Background service for defragmenting snapshots in the active snapshot chain.
+ * When snapshots are taken, they capture the entire OM RocksDB state but may 
contain
+ * fragmented data. This service defragments snapshots by creating new 
compacted
+ * RocksDB instances with only the necessary data for tracked column families.
+ * <p>
+ * The service processes snapshots in the active chain sequentially, starting 
with
+ * the first non-defragmented snapshot. For the first snapshot in the chain, it
+ * performs a full defragmentation by copying all keys. For subsequent 
snapshots,
+ * it uses incremental defragmentation based on diffs from the previous 
defragmented
+ * snapshot.
+ */
+public class SnapshotDefragService extends BackgroundService
+    implements BootstrapStateHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SnapshotDefragService.class);
+
+  // Use only a single thread for snapshot defragmentation to avoid conflicts
+  private static final int DEFRAG_CORE_POOL_SIZE = 1;
+
+  private final OzoneManager ozoneManager;
+  private final AtomicLong runCount = new AtomicLong(0);
+
+  // Number of snapshots to be processed in a single iteration
+  private final long snapshotLimitPerTask;
+
+  private final AtomicLong snapshotsDefraggedCount;
+  private final AtomicBoolean running;
+
+  private final MultiSnapshotLocks snapshotIdLocks;
+
+  private final BootstrapStateHandler.Lock lock = new 
BootstrapStateHandler.Lock();
+
+  public SnapshotDefragService(long interval, TimeUnit unit, long 
serviceTimeout,
+      OzoneManager ozoneManager, OzoneConfiguration configuration) {
+    super("SnapshotDefragService", interval, unit, DEFRAG_CORE_POOL_SIZE,
+        serviceTimeout, ozoneManager.getThreadNamePrefix());
+    this.ozoneManager = ozoneManager;
+    this.snapshotLimitPerTask = configuration
+        .getLong(SNAPSHOT_DEFRAG_LIMIT_PER_TASK,
+            SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT);
+    snapshotsDefraggedCount = new AtomicLong(0);
+    running = new AtomicBoolean(false);
+    IOzoneManagerLock omLock = ozoneManager.getMetadataManager().getLock();
+    this.snapshotIdLocks = new MultiSnapshotLocks(omLock, SNAPSHOT_GC_LOCK, 
true);
+  }
+
+  @Override
+  public void start() {
+    running.set(true);
+    super.start();
+  }
+
+  @VisibleForTesting
+  public void pause() {
+    running.set(false);
+  }
+
+  @VisibleForTesting
+  public void resume() {
+    running.set(true);
+  }
+
+  /**
+   * Checks if rocks-tools native library is available.
+   */
+  private boolean isRocksToolsNativeLibAvailable() {
+    try {
+      return ManagedRawSSTFileReader.tryLoadLibrary();
+    } catch (Exception e) {
+      LOG.warn("Failed to check native code availability", e);
+      return false;
+    }
+  }
+
+  /**
+   * Checks if a snapshot needs defragmentation by examining its YAML metadata.
+   */
+  private boolean needsDefragmentation(SnapshotInfo snapshotInfo) {
+    String snapshotPath = OmSnapshotManager.getSnapshotPath(
+        ozoneManager.getConfiguration(), snapshotInfo);
+
+    try {
+      // Read snapshot local metadata from YAML
+      OmSnapshotLocalData snapshotLocalData = 
ozoneManager.getOmSnapshotManager()
+          .getSnapshotLocalDataManager()
+          .getOmSnapshotLocalData(snapshotInfo);
+
+      // Check if snapshot needs compaction (defragmentation)
+      boolean needsDefrag = snapshotLocalData.getNeedsDefrag();
+      LOG.debug("Snapshot {} needsDefragmentation field value: {}",
+          snapshotInfo.getName(), needsDefrag);
+
+      return needsDefrag;
+    } catch (IOException e) {
+      LOG.warn("Failed to read YAML metadata for snapshot {}, assuming defrag 
needed",
+          snapshotInfo.getName(), e);
+      return true;
+    }
+  }
+
+  /**
+   * Performs full defragmentation for the first snapshot in the chain.
+   * This is a simplified implementation that demonstrates the concept.
+   */
+  private void performFullDefragmentation(SnapshotInfo snapshotInfo,
+      OmSnapshot omSnapshot) throws IOException {
+
+    // TODO: Implement full defragmentation
+  }
+
+  /**
+   * Performs incremental defragmentation using diff from previous 
defragmented snapshot.
+   */
+  private void performIncrementalDefragmentation(SnapshotInfo currentSnapshot,
+      SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot)
+      throws IOException {
+
+    // TODO: Implement incremental defragmentation
+  }
+
+  private final class SnapshotDefragTask implements BackgroundTask {
+
+    @Override
+    public BackgroundTaskResult call() throws Exception {
+      // Check OM leader and readiness
+      if (shouldRun()) {
+        final long count = runCount.incrementAndGet();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Initiating Snapshot Defragmentation Task: run # {}", 
count);
+        }
+        triggerSnapshotDefragOnce();
+      }
+
+      return EmptyTaskResult.newResult();
+    }
+  }
+
+  public synchronized boolean triggerSnapshotDefragOnce() throws IOException {
+    // Check if rocks-tools native lib is available
+    if (!isRocksToolsNativeLibAvailable()) {
+      LOG.warn("Rocks-tools native library is not available. " +
+          "Stopping SnapshotDefragService.");
+      return false;
+    }
+
+    Optional<OmSnapshotManager> snapshotManager = 
Optional.ofNullable(ozoneManager)
+        .map(OzoneManager::getOmSnapshotManager);
+    if (!snapshotManager.isPresent()) {
+      LOG.debug("OmSnapshotManager not available, skipping defragmentation 
task");
+      return false;
+    }
+
+    // Get the SnapshotChainManager to iterate through the global snapshot 
chain
+    final SnapshotChainManager snapshotChainManager =
+        ((OmMetadataManagerImpl) 
ozoneManager.getMetadataManager()).getSnapshotChainManager();
+
+    final Table<String, SnapshotInfo> snapshotInfoTable =
+        ozoneManager.getMetadataManager().getSnapshotInfoTable();
+
+    // Use iterator(false) to iterate forward through the snapshot chain
+    Iterator<UUID> snapshotIterator = snapshotChainManager.iterator(false);
+
+    long snapshotLimit = snapshotLimitPerTask;
+
+    while (snapshotLimit > 0 && running.get() && snapshotIterator.hasNext()) {
+      // Get SnapshotInfo for the current snapshot in the chain
+      UUID snapshotId = snapshotIterator.next();
+      String snapshotTableKey = snapshotChainManager.getTableKey(snapshotId);
+      SnapshotInfo snapshotToDefrag = snapshotInfoTable.get(snapshotTableKey);
+      if (snapshotToDefrag == null) {
+        LOG.warn("Snapshot with ID '{}' not found in snapshot info table", 
snapshotId);
+        continue;
+      }
+
+      // Skip deleted snapshots
+      if (snapshotToDefrag.getSnapshotStatus() == 
SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED) {
+        LOG.debug("Skipping deleted snapshot: {} (ID: {})",
+            snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
+        continue;
+      }
+
+      // Check if this snapshot needs defragmentation
+      if (!needsDefragmentation(snapshotToDefrag)) {
+        LOG.debug("Skipping already defragged snapshot: {} (ID: {})",
+            snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
+        continue;
+      }
+
+      LOG.info("Will defrag snapshot: {} (ID: {})",
+          snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
+
+      // Acquire MultiSnapshotLocks
+      if 
(!snapshotIdLocks.acquireLock(Collections.singletonList(snapshotToDefrag.getSnapshotId()))
+          .isLockAcquired()) {
+        LOG.error("Abort. Failed to acquire lock on snapshot: {} (ID: {})",
+            snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
+        break;
+      }
+
+      try {
+        LOG.info("Processing snapshot defragmentation for: {} (ID: {})",
+            snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
+
+        // Get snapshot through SnapshotCache for proper locking
+        try (UncheckedAutoCloseableSupplier<OmSnapshot> snapshotSupplier =
+                 
snapshotManager.get().getSnapshot(snapshotToDefrag.getSnapshotId())) {
+
+          OmSnapshot omSnapshot = snapshotSupplier.get();
+
+          UUID pathPreviousSnapshotId = 
snapshotToDefrag.getPathPreviousSnapshotId();
+          boolean isFirstSnapshotInPath = pathPreviousSnapshotId == null;
+          if (isFirstSnapshotInPath) {
+            LOG.info("Performing full defragmentation for first snapshot (in 
path): {}",
+                snapshotToDefrag.getName());
+            performFullDefragmentation(snapshotToDefrag, omSnapshot);
+          } else {
+            final String psIdtableKey = 
snapshotChainManager.getTableKey(pathPreviousSnapshotId);
+            SnapshotInfo previousDefraggedSnapshot = 
snapshotInfoTable.get(psIdtableKey);
+
+            LOG.info("Performing incremental defragmentation for snapshot: {} 
" +
+                    "based on previous defragmented snapshot: {}",
+                snapshotToDefrag.getName(), 
previousDefraggedSnapshot.getName());
+
+            // If previous path snapshot is not null, it must have been 
defragmented already
+            // Sanity check to ensure previous snapshot exists and is 
defragmented
+            if (needsDefragmentation(previousDefraggedSnapshot)) {
+              LOG.error("Fatal error before defragging snapshot: {}. " +
+                      "Previous snapshot in path {} was not defragged while it 
is expected to be.",
+                  snapshotToDefrag.getName(), 
previousDefraggedSnapshot.getName());
+              break;
+            }
+
+            performIncrementalDefragmentation(snapshotToDefrag,
+                previousDefraggedSnapshot, omSnapshot);
+          }
+
+          // TODO: Update snapshot metadata here?
+
+          // Close and evict the original snapshot DB from SnapshotCache
+          // TODO: Implement proper eviction from SnapshotCache
+          LOG.info("Defragmentation completed for snapshot: {}",
+              snapshotToDefrag.getName());
+
+          snapshotLimit--;
+          snapshotsDefraggedCount.getAndIncrement();
+
+        } catch (OMException ome) {
+          if (ome.getResult() == OMException.ResultCodes.FILE_NOT_FOUND) {
+            LOG.info("Snapshot {} was deleted during defragmentation",
+                snapshotToDefrag.getName());
+          } else {
+            LOG.error("OMException during snapshot defragmentation for: {}",
+                snapshotToDefrag.getName(), ome);
+          }
+        }
+
+      } catch (Exception e) {
+        LOG.error("Exception during snapshot defragmentation for: {}",
+            snapshotToDefrag.getName(), e);
+        return false;
+      } finally {
+        // Release lock MultiSnapshotLocks
+        snapshotIdLocks.releaseLock();
+        LOG.debug("Released MultiSnapshotLocks on snapshot: {} (ID: {})",
+            snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
+
+      }
+    }
+
+    return true;
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    // TODO: Can be parallelized for different buckets
+    queue.add(new SnapshotDefragTask());
+    return queue;
+  }
+
+  /**
+   * Returns true if the service run conditions are satisfied, false otherwise.
+   */
+  private boolean shouldRun() {
+    if (ozoneManager == null) {
+      // OzoneManager can be null for testing
+      return true;
+    }
+    if (ozoneManager.getOmRatisServer() == null) {
+      LOG.warn("OzoneManagerRatisServer is not initialized yet");
+      return false;
+    }
+    // The service only runs if current OM node is ready
+    return running.get() && ozoneManager.isRunning();
+  }
+
+  public AtomicLong getSnapshotsDefraggedCount() {
+    return snapshotsDefraggedCount;
+  }
+
+  @Override
+  public BootstrapStateHandler.Lock getBootstrapStateLock() {
+    return lock;
+  }
+
+  @Override
+  public void shutdown() {
+    running.set(false);
+    super.shutdown();
+  }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to