This is an automated email from the ASF dual-hosted git repository.

swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 299b2948095 HDDS-13009. Background snapshot defrag service (#9324)
299b2948095 is described below

commit 299b294809521712a37e33def5ac6717985409b7
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Mon Nov 24 23:22:48 2025 -0500

    HDDS-13009. Background snapshot defrag service (#9324)
---
 .../org/apache/hadoop/hdds/utils/db/DBStore.java   |   8 +
 .../org/apache/hadoop/hdds/utils/db/RDBStore.java  |  24 +
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |   8 +
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  12 +-
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  20 +-
 .../apache/hadoop/ozone/om/OmSnapshotManager.java  |  60 ++
 .../response/snapshot/OMSnapshotPurgeResponse.java |  40 +-
 .../hadoop/ozone/om/snapshot/SnapshotCache.java    |  24 +-
 .../om/snapshot/defrag/SnapshotDefragService.java  | 619 ++++++++++++++++-----
 .../snapshot/defrag/TestSnapshotDefragService.java | 212 +++++++
 .../snapshot/diff/delta/TestRDBDifferComputer.java |   2 +
 11 files changed, 844 insertions(+), 185 deletions(-)

diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
index 0fb91f42d90..f71ffe42197 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
@@ -145,6 +145,12 @@ <KEY, VALUE> TypedTable<KEY, VALUE> getTable(
    */
   Map<Integer, String> getTableNames();
 
+  /**
+   * Drop the specific table.
+   * @param tableName - Name of the table to truncate.
+   */
+  void dropTable(String tableName) throws RocksDatabaseException;
+
   /**
    * Get data written to DB since a specific sequence number.
    */
@@ -162,4 +168,6 @@ DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long 
limitCount)
    * @return true if the DB is closed.
    */
   boolean isClosed();
+
+  String getSnapshotsParentDir();
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index e3853a84211..8e2e2b85426 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -189,6 +189,7 @@ public String getSnapshotMetadataDir() {
     return dbLocation.getParent() + OM_KEY_PREFIX + OM_SNAPSHOT_DIFF_DIR;
   }
 
+  @Override
   public String getSnapshotsParentDir() {
     return snapshotsParentDir;
   }
@@ -332,6 +333,29 @@ public Map<Integer, String> getTableNames() {
     return db.getColumnFamilyNames();
   }
 
+  /**
+  /**
+   * Drops a table from the database by removing its associated column family.
+   * <p>
+   * <b>Warning:</b> This operation should be used with extreme caution. If 
the table needs to be used again,
+   * it is recommended to reinitialize the entire DB store, as the column 
family will be permanently
+   * removed from the database. This method is suitable for truncating a 
RocksDB column family in a single operation.
+   *
+   * @param tableName the name of the table to be dropped
+   * @throws RocksDatabaseException if an error occurs while attempting to 
drop the table
+   */
+  @Override
+  public void dropTable(String tableName) throws RocksDatabaseException {
+    ColumnFamily columnFamily = db.getColumnFamily(tableName);
+    if (columnFamily != null) {
+      try {
+        
db.getManagedRocksDb().get().dropColumnFamily(columnFamily.getHandle());
+      } catch (RocksDBException e) {
+        throw new RocksDatabaseException("Failed to drop " + tableName, e);
+      }  
+    }
+  }
+
   public Collection<ColumnFamily> getColumnFamilies() {
     return db.getExtraColumnFamilies();
   }
diff --git 
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
 
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index baac362da74..461324d0022 100644
--- 
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ 
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -21,6 +21,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.time.Duration;
 import java.util.Iterator;
 import java.util.List;
@@ -85,6 +86,13 @@ public interface OMMetadataManager extends DBStoreHAManager, 
AutoCloseable {
   @VisibleForTesting
   DBStore getStore();
 
+  /**
+   * Retrieves the parent directory of all the snapshots in the system.
+   *
+   * @return a Path object representing the parent directory of the snapshot.
+   */
+  Path getSnapshotParentDir();
+
   /**
    * Returns the OzoneManagerLock used on Metadata DB.
    *
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index b5137ed3d61..a0dd69a9752 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -421,10 +421,14 @@ public void startSnapshotDefragService(OzoneConfiguration 
conf) {
           OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT_DEFAULT,
           TimeUnit.MILLISECONDS);
 
-      snapshotDefragService =
-          new SnapshotDefragService(serviceInterval, TimeUnit.MILLISECONDS,
-              serviceTimeout, ozoneManager, conf);
-      snapshotDefragService.start();
+      try {
+        snapshotDefragService =
+            new SnapshotDefragService(serviceInterval, TimeUnit.MILLISECONDS,
+                serviceTimeout, ozoneManager, conf);
+        snapshotDefragService.start();
+      } catch (IOException e) {
+        LOG.error("Error starting Snapshot Defrag Service", e);
+      }
     } else {
       LOG.info("SnapshotDefragService is disabled. Snapshot defragmentation 
will not run periodically.");
     }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 6e79ca25ac8..aeaf860c964 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -234,6 +234,11 @@ protected OmMetadataManagerImpl() {
 
   public static OmMetadataManagerImpl createCheckpointMetadataManager(
       OzoneConfiguration conf, DBCheckpoint checkpoint) throws IOException {
+    return createCheckpointMetadataManager(conf, checkpoint, true);
+  }
+
+  public static OmMetadataManagerImpl createCheckpointMetadataManager(
+      OzoneConfiguration conf, DBCheckpoint checkpoint, boolean readOnly) 
throws IOException {
     Path path = checkpoint.getCheckpointLocation();
     Path parent = path.getParent();
     if (parent == null) {
@@ -246,7 +251,11 @@ public static OmMetadataManagerImpl 
createCheckpointMetadataManager(
       throw new IllegalStateException("DB checkpoint dir name should not "
           + "have been null. Checkpoint path is " + path);
     }
-    return new OmMetadataManagerImpl(conf, dir, name.toString());
+    return new OmMetadataManagerImpl(conf, dir, name.toString(), readOnly);
+  }
+
+  protected OmMetadataManagerImpl(OzoneConfiguration conf, File dir, String 
name) throws IOException {
+    this(conf, dir, name, true);
   }
 
   /**
@@ -257,7 +266,7 @@ public static OmMetadataManagerImpl 
createCheckpointMetadataManager(
    * @param name - Checkpoint directory name.
    * @throws IOException
    */
-  protected OmMetadataManagerImpl(OzoneConfiguration conf, File dir, String 
name)
+  protected OmMetadataManagerImpl(OzoneConfiguration conf, File dir, String 
name, boolean readOnly)
       throws IOException {
     lock = new OmReadOnlyLock();
     hierarchicalLockManager = new ReadOnlyHierarchicalResourceLockManager();
@@ -265,7 +274,7 @@ protected OmMetadataManagerImpl(OzoneConfiguration conf, 
File dir, String name)
     int maxOpenFiles = conf.getInt(OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES, 
OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES_DEFAULT);
 
     this.store = newDBStoreBuilder(conf, name, dir)
-        .setOpenReadOnly(true)
+        .setOpenReadOnly(readOnly)
         .disableDefaultCFAutoCompaction(true)
         .setMaxNumberOfOpenFiles(maxOpenFiles)
         .setEnableCompactionDag(false)
@@ -519,6 +528,11 @@ public DBStore getStore() {
     return store;
   }
 
+  @Override
+  public Path getSnapshotParentDir() {
+    return Paths.get(store.getSnapshotsParentDir());
+  }
+
   /**
    * Given a volume return the corresponding DB key.
    *
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index c3b9feae77f..a1c7aa918fa 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.ozone.om;
 
+import static org.apache.commons.io.file.PathUtils.deleteDirectory;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static 
org.apache.hadoop.hdds.StringUtils.getLexicographicallyHigherString;
 import static 
org.apache.hadoop.hdds.utils.db.DBStoreBuilder.DEFAULT_COLUMN_FAMILY_NAME;
@@ -97,6 +98,7 @@
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.SnapshotDiffJob;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
 import org.apache.hadoop.ozone.om.service.SnapshotDiffCleanupService;
 import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
 import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
@@ -512,6 +514,64 @@ public void invalidateCacheEntry(UUID key) {
     }
   }
 
+  /**
+   * Deletes the snapshot checkpoint directories for a given snapshot ID up to
+   * the specified maximum version. This method ensures that all directories
+   * containing checkpoint data for the specified snapshot ID up to the max
+   * version are removed in a controlled manner.
+   *
+   * @param snapshotId The unique identifier of the snapshot whose checkpoint
+   *                   directories are to be deleted.
+   * @param maxVersion The maximum version of checkpoint directories to delete.
+   *                   If a value less than 0 is provided, it defaults to the
+   *                   current maximum version of the snapshot.
+   * @throws IOException If there is a failure acquiring the snapshot database
+   *                     lock or while deleting directories.
+   * @throws IllegalArgumentException If the specified maxVersion is greater
+   *                                  than the current maximum version of the
+   *                                  snapshot.
+   */
+  public void deleteSnapshotCheckpointDirectories(UUID snapshotId, int 
maxVersion) throws IOException {
+    // Acquire Snapshot DBHandle lock before removing the older version to 
ensure all readers are done with the
+    // snapshot db use.
+    try (UncheckedAutoCloseableSupplier<OMLockDetails> lock = 
getSnapshotCache().lock(snapshotId)) {
+      if (!lock.get().isLockAcquired()) {
+        throw new IOException("Failed to acquire dbHandlelock on snapshot: " + 
snapshotId);
+      }
+      try (OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataMetaProvider 
snapMetaProvider =
+               
snapshotLocalDataManager.getOmSnapshotLocalDataMeta(snapshotId)) {
+        if (maxVersion < 0) {
+          maxVersion = snapMetaProvider.getMeta().getVersion();
+        }
+        if (maxVersion > snapMetaProvider.getMeta().getVersion()) {
+          throw new IllegalArgumentException(
+              String.format("Max Version to be deleted can never be greater 
than the existing " +
+                  "version of the snapshot. Argument passed : %d and 
snapshotMaxVersion : %d", maxVersion,
+                  snapMetaProvider.getMeta().getVersion()));
+        }
+        // Binary search the smallest existing version and delete the older 
versions starting from the smallest version.
+        // This is to ensure efficient crash recovery.
+        int smallestExistingVersion = 0;
+        int largestExistingVersion = maxVersion;
+        while (smallestExistingVersion <= largestExistingVersion) {
+          int midVersion = smallestExistingVersion + (largestExistingVersion - 
smallestExistingVersion) / 2;
+          Path path = 
OmSnapshotManager.getSnapshotPath(ozoneManager.getMetadataManager(), 
snapshotId, midVersion);
+          if (path.toFile().exists()) {
+            largestExistingVersion = midVersion - 1;
+          } else {
+            smallestExistingVersion = midVersion + 1;
+          }
+        }
+        // Delete the older version directories. Always starting deletes from 
smallest version to largest version to
+        // ensure binary search works correctly on a later basis.
+        for (int version = smallestExistingVersion; version <= maxVersion; 
version++) {
+          Path path = 
OmSnapshotManager.getSnapshotPath(ozoneManager.getMetadataManager(), 
snapshotId, version);
+          deleteDirectory(path);
+        }
+      }
+    }
+  }
+
   /**
    * Creates snapshot checkpoint that corresponds to snapshotInfo.
    * @param omMetadataManager the metadata manager
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
index 3bc8a8dc27b..852a7a63723 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
@@ -18,22 +18,18 @@
 package org.apache.hadoop.ozone.om.response.snapshot;
 
 import static 
org.apache.hadoop.ozone.om.codec.OMDBDefinition.SNAPSHOT_INFO_TABLE;
-import static org.apache.hadoop.ozone.om.lock.FlatResource.SNAPSHOT_DB_LOCK;
 
 import com.google.common.annotations.VisibleForTesting;
 import jakarta.annotation.Nonnull;
 import java.io.IOException;
-import java.nio.file.Path;
 import java.util.List;
 import java.util.Map;
-import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
-import org.apache.hadoop.ozone.om.lock.OMLockDetails;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
@@ -95,10 +91,9 @@ protected void addToDBBatch(OMMetadataManager 
omMetadataManager,
       if (snapshotInfo == null) {
         continue;
       }
-
+      OmSnapshotManager omSnapshotManager = 
metadataManager.getOzoneManager().getOmSnapshotManager();
       // Remove and close snapshot's RocksDB instance from SnapshotCache.
-      ((OmMetadataManagerImpl) 
omMetadataManager).getOzoneManager().getOmSnapshotManager()
-          .invalidateCacheEntry(snapshotInfo.getSnapshotId());
+      omSnapshotManager.invalidateCacheEntry(snapshotInfo.getSnapshotId());
       // Remove the snapshot from snapshotId to snapshotTableKey map.
       ((OmMetadataManagerImpl) omMetadataManager).getSnapshotChainManager()
           .removeFromSnapshotIdToTable(snapshotInfo.getSnapshotId());
@@ -109,7 +104,8 @@ protected void addToDBBatch(OMMetadataManager 
omMetadataManager,
       // snapshot purged txn is flushed to rocksdb.
       updateLocalData(snapshotLocalDataManager, snapshotInfo);
       // Delete Snapshot checkpoint directory.
-      deleteCheckpointDirectory(snapshotLocalDataManager, omMetadataManager, 
snapshotInfo);
+
+      
omSnapshotManager.deleteSnapshotCheckpointDirectories(snapshotInfo.getSnapshotId(),
 -1);
       // Delete snapshotInfo from the table.
       omMetadataManager.getSnapshotInfoTable().deleteWithBatch(batchOperation, 
dbKey);
     }
@@ -133,34 +129,6 @@ private void updateLocalData(OmSnapshotLocalDataManager 
localDataManager, Snapsh
     }
   }
 
-  /**
-   * Deletes the checkpoint directory for a snapshot.
-   */
-  private void deleteCheckpointDirectory(OmSnapshotLocalDataManager 
snapshotLocalDataManager,
-      OMMetadataManager omMetadataManager, SnapshotInfo snapshotInfo) throws 
IOException {
-    // Acquiring write lock to avoid race condition with sst filtering service 
which creates a sst filtered file
-    // inside the snapshot directory. Any operation apart which doesn't 
create/delete files under this snapshot
-    // directory can run in parallel along with this operation.
-    OMLockDetails omLockDetails = omMetadataManager.getLock()
-        .acquireWriteLock(SNAPSHOT_DB_LOCK, 
snapshotInfo.getSnapshotId().toString());
-    boolean acquiredSnapshotLock = omLockDetails.isLockAcquired();
-    if (acquiredSnapshotLock) {
-      try (OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataMetaProvider 
snapMetaProvider =
-               
snapshotLocalDataManager.getOmSnapshotLocalDataMeta(snapshotInfo)) {
-        Path snapshotDirPath = 
OmSnapshotManager.getSnapshotPath(omMetadataManager, snapshotInfo,
-            snapMetaProvider.getMeta().getVersion());
-        try {
-          FileUtils.deleteDirectory(snapshotDirPath.toFile());
-        } catch (IOException ex) {
-          LOG.error("Failed to delete snapshot directory {} for snapshot {}",
-              snapshotDirPath, snapshotInfo.getTableKey(), ex);
-        } finally {
-          omMetadataManager.getLock().releaseWriteLock(SNAPSHOT_DB_LOCK, 
snapshotInfo.getSnapshotId().toString());
-        }
-      }
-    }
-  }
-
   @VisibleForTesting
   public Map<String, SnapshotInfo> getUpdatedSnapInfos() {
     return updatedSnapInfos;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
index 3868436eb51..0699fdd39db 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
@@ -291,7 +291,10 @@ public void release(UUID key) {
    */
   public UncheckedAutoCloseableSupplier<OMLockDetails> lock() {
     return lock(() -> lock.acquireResourceWriteLock(SNAPSHOT_DB_LOCK),
-        () -> lock.releaseResourceWriteLock(SNAPSHOT_DB_LOCK), () -> 
cleanup(true));
+        () -> lock.releaseResourceWriteLock(SNAPSHOT_DB_LOCK), () -> {
+        cleanup(true);
+        return dbMap.isEmpty();
+      });
   }
 
   /**
@@ -303,7 +306,10 @@ public UncheckedAutoCloseableSupplier<OMLockDetails> 
lock() {
   public UncheckedAutoCloseableSupplier<OMLockDetails> lock(UUID snapshotId) {
     return lock(() -> lock.acquireWriteLock(SNAPSHOT_DB_LOCK, 
snapshotId.toString()),
         () -> lock.releaseWriteLock(SNAPSHOT_DB_LOCK, snapshotId.toString()),
-        () -> cleanup(snapshotId));
+        () -> {
+        cleanup(snapshotId, false);
+        return !dbMap.containsKey(snapshotId);
+      });
   }
 
   private OMLockDetails getEmptyOmLockDetails(OMLockDetails lockDetails) {
@@ -311,14 +317,13 @@ private OMLockDetails getEmptyOmLockDetails(OMLockDetails 
lockDetails) {
   }
 
   private UncheckedAutoCloseableSupplier<OMLockDetails> 
lock(Supplier<OMLockDetails> lockFunction,
-      Supplier<OMLockDetails> unlockFunction, Supplier<Void> cleanupFunction) {
+      Supplier<OMLockDetails> unlockFunction, Supplier<Boolean> 
cleanupFunction) {
     Supplier<OMLockDetails> emptyLockFunction = () -> 
getEmptyOmLockDetails(lockFunction.get());
     Supplier<OMLockDetails> emptyUnlockFunction = () -> 
getEmptyOmLockDetails(unlockFunction.get());
 
     AtomicReference<OMLockDetails> lockDetails = new 
AtomicReference<>(emptyLockFunction.get());
     if (lockDetails.get().isLockAcquired()) {
-      cleanupFunction.get();
-      if (!dbMap.isEmpty()) {
+      if (!cleanupFunction.get()) {
         lockDetails.set(emptyUnlockFunction.get());
       }
     }
@@ -349,14 +354,19 @@ public OMLockDetails get() {
   private synchronized Void cleanup(boolean force) {
     if (force || dbMap.size() > cacheSizeLimit) {
       for (UUID evictionKey : pendingEvictionQueue) {
-        cleanup(evictionKey);
+        cleanup(evictionKey, true);
       }
     }
     return null;
   }
 
-  private synchronized Void cleanup(UUID evictionKey) {
+  private synchronized Void cleanup(UUID evictionKey, boolean 
expectKeyToBePresent) {
     ReferenceCounted<OmSnapshot> snapshot = dbMap.get(evictionKey);
+
+    if (!expectKeyToBePresent && snapshot == null) {
+      return null;
+    }
+
     if (snapshot != null && snapshot.getTotalRefCount() == 0) {
       try {
         compactSnapshotDB(snapshot.get());
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java
index 9be4a0d3389..c54f405e242 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java
@@ -17,42 +17,80 @@
 
 package org.apache.hadoop.ozone.om.snapshot.defrag;
 
+import static java.nio.file.Files.createDirectories;
+import static org.apache.commons.io.file.PathUtils.deleteDirectory;
+import static 
org.apache.hadoop.hdds.StringUtils.getLexicographicallyHigherString;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT;
-import static org.apache.hadoop.ozone.om.lock.FlatResource.SNAPSHOT_GC_LOCK;
+import static 
org.apache.hadoop.ozone.om.OmSnapshotManager.COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT;
+import static 
org.apache.hadoop.ozone.om.lock.FlatResource.SNAPSHOT_DB_CONTENT_LOCK;
+import static 
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.SST_FILE_EXTENSION;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.Collections;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.hdds.utils.BackgroundTask;
 import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
 import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
 import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.ByteArrayCodec;
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
+import org.apache.hadoop.hdds.utils.db.CodecException;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBSstFileWriter;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
+import org.apache.hadoop.hdds.utils.db.StringCodec;
 import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader;
 import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotLocalData;
 import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.SnapshotChainManager;
-import org.apache.hadoop.ozone.om.SstFilteringService;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
 import org.apache.hadoop.ozone.om.snapshot.MultiSnapshotLocks;
 import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
+import 
org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager.WritableOmSnapshotLocalDataProvider;
+import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
+import 
org.apache.hadoop.ozone.om.snapshot.diff.delta.CompositeDeltaDiffComputer;
+import org.apache.hadoop.ozone.om.snapshot.util.TableMergeIterator;
 import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature;
+import org.apache.hadoop.ozone.util.ClosableIterator;
+import org.apache.logging.log4j.util.Strings;
+import org.apache.ozone.rocksdb.util.SstFileInfo;
+import org.apache.ozone.rocksdb.util.SstFileSetReader;
+import org.apache.ratis.util.UncheckedAutoCloseable;
 import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
+import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,16 +124,24 @@ public class SnapshotDefragService extends 
BackgroundService
   private final AtomicLong snapshotsDefraggedCount;
   private final AtomicBoolean running;
 
-  private final MultiSnapshotLocks snapshotIdLocks;
+  private final MultiSnapshotLocks snapshotContentLocks;
   private final OzoneConfiguration conf;
 
   private final BootstrapStateHandler.Lock lock = new 
BootstrapStateHandler.Lock();
+  private final String tmpDefragDir;
+  private final OmSnapshotManager omSnapshotManager;
+  private final OmSnapshotLocalDataManager snapshotLocalDataManager;
+  private final List<UUID> lockIds;
+  private final CompositeDeltaDiffComputer deltaDiffComputer;
+  private final Path differTmpDir;
 
   public SnapshotDefragService(long interval, TimeUnit unit, long 
serviceTimeout,
-      OzoneManager ozoneManager, OzoneConfiguration configuration) {
+      OzoneManager ozoneManager, OzoneConfiguration configuration) throws 
IOException {
     super("SnapshotDefragService", interval, unit, DEFRAG_CORE_POOL_SIZE,
         serviceTimeout, ozoneManager.getThreadNamePrefix());
     this.ozoneManager = ozoneManager;
+    this.omSnapshotManager = ozoneManager.getOmSnapshotManager();
+    this.snapshotLocalDataManager = 
omSnapshotManager.getSnapshotLocalDataManager();
     this.snapshotLimitPerTask = configuration
         .getLong(SNAPSHOT_DEFRAG_LIMIT_PER_TASK,
             SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT);
@@ -103,7 +149,22 @@ public SnapshotDefragService(long interval, TimeUnit unit, 
long serviceTimeout,
     snapshotsDefraggedCount = new AtomicLong(0);
     running = new AtomicBoolean(false);
     IOzoneManagerLock omLock = ozoneManager.getMetadataManager().getLock();
-    this.snapshotIdLocks = new MultiSnapshotLocks(omLock, SNAPSHOT_GC_LOCK, 
true, 1);
+    this.snapshotContentLocks = new MultiSnapshotLocks(omLock, 
SNAPSHOT_DB_CONTENT_LOCK, true, 1);
+    Path tmpDefragDirPath = 
ozoneManager.getMetadataManager().getSnapshotParentDir().toAbsolutePath()
+        .resolve("tmp_defrag");
+    // Delete and recreate tmp dir if it exists
+    if (tmpDefragDirPath.toFile().exists()) {
+      deleteDirectory(tmpDefragDirPath);
+    }
+    createDirectories(tmpDefragDirPath);
+    this.tmpDefragDir = tmpDefragDirPath.toString();
+    this.differTmpDir = tmpDefragDirPath.resolve("differSstFiles");
+
+    this.deltaDiffComputer = new CompositeDeltaDiffComputer(omSnapshotManager,
+        ozoneManager.getMetadataManager(), differTmpDir, (status) -> {
+      LOG.debug("Snapshot defragmentation diff status: {}", status);
+    }, false, !isRocksToolsNativeLibAvailable());
+    this.lockIds = new ArrayList<>(1);
   }
 
   @Override
@@ -135,48 +196,318 @@ private boolean isRocksToolsNativeLibAvailable() {
   }
 
   /**
-   * Checks if a snapshot needs defragmentation by examining its YAML metadata.
+   * Determines whether the specified snapshot requires defragmentation and 
returns
+   * a pair indicating the need for defragmentation and the corresponding 
version of the snapshot.
+   *
+   * @param snapshotInfo Information about the snapshot to be checked for 
defragmentation.
+   * @return A pair containing a boolean value and an integer:
+   *         - The boolean value indicates whether the snapshot requires 
defragmentation
+   *         (true if needed, false otherwise).
+   *         - The integer represents the version of the snapshot being 
evaluated.
+   * @throws IOException If an I/O error occurs while accessing the local 
snapshot data or metadata.
    */
-  private boolean needsDefragmentation(SnapshotInfo snapshotInfo) {
-    if (!SstFilteringService.isSstFiltered(conf, snapshotInfo)) {
-      return false;
-    }
-    try (OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataProvider 
readableOmSnapshotLocalDataProvider =
-             
ozoneManager.getOmSnapshotManager().getSnapshotLocalDataManager().getOmSnapshotLocalData(snapshotInfo))
 {
-      Path snapshotPath = OmSnapshotManager.getSnapshotPath(
-          ozoneManager.getMetadataManager(), snapshotInfo,
-          
readableOmSnapshotLocalDataProvider.getSnapshotLocalData().getVersion());
+  @VisibleForTesting
+  Pair<Boolean, Integer> needsDefragmentation(SnapshotInfo snapshotInfo) 
throws IOException {
+    // Update snapshot local metadata to point to the correct previous 
snapshotId if it was different and check if
+    // snapshot needs defrag.
+    try (WritableOmSnapshotLocalDataProvider 
writableOmSnapshotLocalDataProvider =
+             
snapshotLocalDataManager.getWritableOmSnapshotLocalData(snapshotInfo)) {
       // Read snapshot local metadata from YAML
       // Check if snapshot needs compaction (defragmentation)
-      boolean needsDefrag = readableOmSnapshotLocalDataProvider.needsDefrag();
-      LOG.debug("Snapshot {} needsDefragmentation field value: {}", 
snapshotInfo.getName(), needsDefrag);
+      writableOmSnapshotLocalDataProvider.commit();
+      boolean needsDefrag = writableOmSnapshotLocalDataProvider.needsDefrag();
+      OmSnapshotLocalData localData = 
writableOmSnapshotLocalDataProvider.getSnapshotLocalData();
+      if (!needsDefrag) {
+        OmSnapshotLocalData previousLocalData = 
writableOmSnapshotLocalDataProvider.getPreviousSnapshotLocalData();
+        LOG.debug("Skipping defragmentation since snapshot has already been 
defragmented: id : {}(version: {}=>{}) " +
+                "previousId: {}(version: {})", snapshotInfo.getSnapshotId(), 
localData.getVersion(),
+            
localData.getVersionSstFileInfos().get(localData.getVersion()).getPreviousSnapshotVersion(),
+            snapshotInfo.getPathPreviousSnapshotId(), 
previousLocalData.getVersion());
+      } else {
+        LOG.debug("Snapshot {} needsDefragmentation field value: true", 
snapshotInfo.getSnapshotId());
+      }
+      return Pair.of(needsDefrag, localData.getVersion());
+    }
+  }
 
-      return needsDefrag;
-    } catch (IOException e) {
-      LOG.warn("Failed to read YAML metadata for snapshot {}, assuming defrag 
needed",
-          snapshotInfo.getName(), e);
-      return true;
+  private Pair<String, String> getTableBounds(Table<String, ?> table) throws 
RocksDatabaseException, CodecException {
+    String tableLowestValue = null, tableHighestValue = null;
+    try (TableIterator<String, String> keyIterator = table.keyIterator()) {
+      if (keyIterator.hasNext()) {
+        // Setting the lowest value to the first key in the table.
+        tableLowestValue = keyIterator.next();
+      }
+      keyIterator.seekToLast();
+      if (keyIterator.hasNext()) {
+        // Setting the highest value to the last key in the table.
+        tableHighestValue = keyIterator.next();
+      }
     }
+    return Pair.of(tableLowestValue, tableHighestValue);
   }
 
   /**
-   * Performs full defragmentation for the first snapshot in the chain.
-   * This is a simplified implementation that demonstrates the concept.
+   * Performs a full defragmentation process for specified tables in the 
metadata manager.
+   * This method processes all the entries in the tables for the provided 
prefix information,
+   * deletes specified key ranges, and compacts the tables to remove 
tombstones.
+   *
+   * @param checkpointDBStore the metadata manager responsible for managing 
tables during the checkpoint process
+   * @param prefixInfo the prefix information used to identify bucket prefix 
and determine key ranges in the tables
+   * @param incrementalTables the set of tables for which incremental 
defragmentation is performed.
+   * @throws IOException if an I/O error occurs during table operations or 
compaction
    */
-  private void performFullDefragmentation(SnapshotInfo snapshotInfo,
-      OmSnapshot omSnapshot) throws IOException {
+  @VisibleForTesting
+  void performFullDefragmentation(DBStore checkpointDBStore, TablePrefixInfo 
prefixInfo,
+      Set<String> incrementalTables) throws IOException {
+    for (String table : incrementalTables) {
+      Table<String, byte[]> checkpointTable = 
checkpointDBStore.getTable(table, StringCodec.get(),
+          ByteArrayCodec.get());
+      String tableBucketPrefix = prefixInfo.getTablePrefix(table);
+      String prefixUpperBound = 
getLexicographicallyHigherString(tableBucketPrefix);
+
+      Pair<String, String> tableBounds = getTableBounds(checkpointTable);
+      String tableLowestValue = tableBounds.getLeft();
+      String tableHighestValue = tableBounds.getRight();
+
+      // If lowest value is not null and if the bucket prefix corresponding to 
the table is greater than lower then
+      // delete the range between lowest value and bucket prefix.
+      if (tableLowestValue != null && 
tableLowestValue.compareTo(tableBucketPrefix) < 0) {
+        checkpointTable.deleteRange(tableLowestValue, tableBucketPrefix);
+      }
+      // If highest value is not null and if the next higher lexicographical 
string of bucket prefix corresponding to
+      // the table is less than equal to the highest value then delete the 
range between bucket prefix
+      // and also the highest value.
+      if (tableHighestValue != null && 
tableHighestValue.compareTo(prefixUpperBound) >= 0) {
+        checkpointTable.deleteRange(prefixUpperBound, tableHighestValue);
+        checkpointTable.delete(tableHighestValue);
+      }
+      // Compact the table completely with kForce to get rid of tombstones.
+      try (ManagedCompactRangeOptions compactRangeOptions = new 
ManagedCompactRangeOptions()) {
+        
compactRangeOptions.setBottommostLevelCompaction(ManagedCompactRangeOptions.BottommostLevelCompaction.kForce);
+        compactRangeOptions.setExclusiveManualCompaction(true);
+        checkpointDBStore.compactTable(table, compactRangeOptions);
+      }
+    }
+  }
 
-    // TODO: Implement full defragmentation
+  /**
+   * Spills table difference into an SST file based on the provided delta file 
paths,
+   * current snapshot table, previous snapshot table, and an optional table 
key prefix.
+   *
+   * The method reads the delta files and compares the records against the 
snapshot tables.
+   * Any differences, including tombstones (deleted entries), are written to a 
new SST file.
+   *
+   * @param deltaFilePaths the list of paths to the delta files to process
+   * @param snapshotTable the current snapshot table for comparison
+   * @param previousSnapshotTable the previous snapshot table for comparison
+   * @param tableKeyPrefix the prefix for filtering certain keys, or null if 
all keys are to be included
+   * @return a pair of the path of the created SST file containing the 
differences and a boolean
+   *         indicating whether any delta entries were written (true if there 
are differences, false otherwise)
+   * @throws IOException if an I/O error occurs during processing
+   */
+  @VisibleForTesting
+  Pair<Path, Boolean> spillTableDiffIntoSstFile(List<Path> deltaFilePaths, 
Table<String, byte[]> snapshotTable,
+      Table<String, byte[]> previousSnapshotTable, String tableKeyPrefix) 
throws IOException {
+    String sstFileReaderUpperBound = null;
+    if (Strings.isNotEmpty(tableKeyPrefix)) {
+      sstFileReaderUpperBound = 
getLexicographicallyHigherString(tableKeyPrefix);
+    }
+    SstFileSetReader sstFileSetReader = new SstFileSetReader(deltaFilePaths);
+    Path fileToBeIngested = differTmpDir.resolve(snapshotTable.getName() + "-" 
+ UUID.randomUUID()
+        + SST_FILE_EXTENSION);
+    int deltaEntriesCount = 0;
+    try (ClosableIterator<String> keysToCheck =
+             sstFileSetReader.getKeyStreamWithTombstone(tableKeyPrefix, 
sstFileReaderUpperBound);
+         TableMergeIterator<String, byte[]> tableMergeIterator = new 
TableMergeIterator<>(keysToCheck,
+             tableKeyPrefix, snapshotTable, previousSnapshotTable);
+         RDBSstFileWriter rdbSstFileWriter = new 
RDBSstFileWriter(fileToBeIngested.toFile())) {
+      while (tableMergeIterator.hasNext()) {
+        Table.KeyValue<String, List<byte[]>> kvs = tableMergeIterator.next();
+        // Check if the values are equal or if they are not equal then the 
value should be written to the
+        // delta sstFile.
+        if (!Arrays.equals(kvs.getValue().get(0), kvs.getValue().get(1))) {
+          try (CodecBuffer key = 
StringCodec.get().toHeapCodecBuffer(kvs.getKey())) {
+            byte[] keyArray = key.asReadOnlyByteBuffer().array();
+            byte[] val = kvs.getValue().get(0);
+            // If the value is null then add a tombstone to the delta sstFile.
+            if (val == null) {
+              rdbSstFileWriter.delete(keyArray);
+            } else {
+              rdbSstFileWriter.put(keyArray, val);
+            }
+          }
+          deltaEntriesCount++;
+        }
+      }
+    } catch (RocksDBException e) {
+      throw new RocksDatabaseException("Error while reading sst files.", e);
+    }
+    // If there are no delta entries then delete the delta file. No need to 
ingest the file as a diff.
+    return Pair.of(fileToBeIngested, deltaEntriesCount != 0);
   }
 
   /**
-   * Performs incremental defragmentation using diff from previous 
defragmented snapshot.
+   * Performs an incremental defragmentation process, which involves 
determining
+   * and processing delta files between snapshots for metadata updates. The 
method
+   * computes the changes, manages file ingestion to the checkpoint metadata 
manager,
+   * and ensures that all delta files are deleted after processing.
+   *
+   * @param previousSnapshotInfo information about the previous snapshot.
+   * @param snapshotInfo information about the current snapshot for which
+   *                     incremental defragmentation is performed.
+   * @param snapshotVersion the version of the snapshot to be processed.
+   * @param checkpointStore the dbStore instance where data
+   *                        updates are ingested after being processed.
+   * @param bucketPrefixInfo table prefix information associated with buckets,
+   *                         used to determine bounds for processing keys.
+   * @param incrementalTables the set of tables for which incremental 
defragmentation is performed.
+   * @throws IOException if an I/O error occurs during processing.
    */
-  private void performIncrementalDefragmentation(SnapshotInfo currentSnapshot,
-      SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot)
+  @VisibleForTesting
+  void performIncrementalDefragmentation(SnapshotInfo previousSnapshotInfo, 
SnapshotInfo snapshotInfo,
+      int snapshotVersion, DBStore checkpointStore, TablePrefixInfo 
bucketPrefixInfo, Set<String> incrementalTables)
       throws IOException {
+    // Map of delta files grouped on the basis of the tableName.
+    Collection<Pair<Path, SstFileInfo>> allTableDeltaFiles = 
this.deltaDiffComputer.getDeltaFiles(
+        previousSnapshotInfo, snapshotInfo, incrementalTables);
+
+    Map<String, List<Path>> tableGroupedDeltaFiles = 
allTableDeltaFiles.stream()
+        .collect(Collectors.groupingBy(pair -> 
pair.getValue().getColumnFamily(),
+            Collectors.mapping(Pair::getKey, Collectors.toList())));
+
+    String volumeName = snapshotInfo.getVolumeName();
+    String bucketName = snapshotInfo.getBucketName();
+
+    Set<Path> filesToBeDeleted = new HashSet<>();
+    // All files computed as delta must be deleted irrespective of whether 
ingestion succeeded or not.
+    allTableDeltaFiles.forEach(pair -> filesToBeDeleted.add(pair.getKey()));
+    try (UncheckedAutoCloseableSupplier<OmSnapshot> snapshot =
+             omSnapshotManager.getActiveSnapshot(volumeName, bucketName, 
snapshotInfo.getName());
+         UncheckedAutoCloseableSupplier<OmSnapshot> previousSnapshot =
+             omSnapshotManager.getActiveSnapshot(volumeName, bucketName, 
previousSnapshotInfo.getName())) {
+      for (Map.Entry<String, List<Path>> entry : 
tableGroupedDeltaFiles.entrySet()) {
+        String table = entry.getKey();
+        List<Path> deltaFiles = entry.getValue();
+        Path fileToBeIngested;
+        if (deltaFiles.size() == 1 && snapshotVersion > 0) {
+          // If there is only one delta file for the table and the snapshot 
version is also not 0 then the same delta
+          // file can reingested into the checkpointStore.
+          fileToBeIngested = deltaFiles.get(0);
+        } else {
+          Table<String, byte[]> snapshotTable = 
snapshot.get().getMetadataManager().getStore()
+              .getTable(table, StringCodec.get(), ByteArrayCodec.get());
+          Table<String, byte[]> previousSnapshotTable = 
previousSnapshot.get().getMetadataManager().getStore()
+              .getTable(table, StringCodec.get(), ByteArrayCodec.get());
+          String tableBucketPrefix = bucketPrefixInfo.getTablePrefix(table);
+          Pair<Path, Boolean> spillResult = 
spillTableDiffIntoSstFile(deltaFiles, snapshotTable,
+              previousSnapshotTable, tableBucketPrefix);
+          fileToBeIngested = spillResult.getValue() ? spillResult.getLeft() : 
null;
+          filesToBeDeleted.add(spillResult.getLeft());
+        }
+        if (fileToBeIngested != null) {
+          if (!fileToBeIngested.toFile().exists()) {
+            throw new IOException("Delta file does not exist: " + 
fileToBeIngested);
+          }
+          Table checkpointTable = checkpointStore.getTable(table);
+          checkpointTable.loadFromFile(fileToBeIngested.toFile());
+        }
+      }
+    } finally {
+      for (Path path : filesToBeDeleted) {
+        if (path.toFile().exists()) {
+          if (!path.toFile().delete()) {
+            LOG.warn("Failed to delete file: {}", path);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Ingests non-incremental tables from a snapshot into a checkpoint database 
store.
+   * This involves exporting table data from the snapshot to intermediate SST 
files
+   * and ingesting them into the corresponding tables in the checkpoint 
database store.
+   * Tables that are part of incremental defragmentation are excluded from 
this process.
+   *
+   * @param checkpointDBStore the database store where non-incremental tables 
are ingested.
+   * @param snapshotInfo the metadata information of the snapshot being 
processed.
+   * @param bucketPrefixInfo prefix information used for determining table 
prefixes.
+   * @param incrementalTables the set of tables identified for incremental 
defragmentation.
+   * @throws IOException if an I/O error occurs during table ingestion or file 
operations.
+   */
+  @VisibleForTesting
+  void ingestNonIncrementalTables(DBStore checkpointDBStore,
+      SnapshotInfo snapshotInfo, TablePrefixInfo bucketPrefixInfo, Set<String> 
incrementalTables) throws IOException {
+    String volumeName = snapshotInfo.getVolumeName();
+    String bucketName = snapshotInfo.getBucketName();
+    String snapshotName = snapshotInfo.getName();
+    Set<Path> filesToBeDeleted = new HashSet<>();
+    try (UncheckedAutoCloseableSupplier<OmSnapshot> snapshot = 
omSnapshotManager.getActiveSnapshot(volumeName,
+        bucketName, snapshotName)) {
+      DBStore snapshotDBStore = snapshot.get().getMetadataManager().getStore();
+      for (Table snapshotTable : snapshotDBStore.listTables()) {
+        String snapshotTableName = snapshotTable.getName();
+        if (!incrementalTables.contains(snapshotTable.getName())) {
+          Path tmpSstFile = differTmpDir.resolve(snapshotTable.getName() + "-" 
+ UUID.randomUUID()
+              + SST_FILE_EXTENSION);
+          filesToBeDeleted.add(tmpSstFile);
+          String prefix = bucketPrefixInfo.getTablePrefix(snapshotTableName);
+          byte[] prefixBytes = Strings.isBlank(prefix) ? null : 
StringCodec.get().toPersistedFormat(prefix);
+          Table<byte[], byte[]> snapshotTableBytes = 
snapshotDBStore.getTable(snapshotTableName, ByteArrayCodec.get(),
+              ByteArrayCodec.get());
+          snapshotTableBytes.dumpToFileWithPrefix(tmpSstFile.toFile(), 
prefixBytes);
+          Table<byte[], byte[]> checkpointTable = 
checkpointDBStore.getTable(snapshotTableName, ByteArrayCodec.get(),
+              ByteArrayCodec.get());
+          checkpointTable.loadFromFile(tmpSstFile.toFile());
+        }
+      }
+    } finally {
+      for (Path path : filesToBeDeleted) {
+        if (path.toFile().exists()) {
+          if (!path.toFile().delete()) {
+            LOG.warn("Failed to delete file for ingesting non incremental 
table: {}", path);
+          }
+        }
+      }
+    }
+  }
 
-    // TODO: Implement incremental defragmentation
+  /**
+   * Atomically switches the current snapshot database to a new version derived
+   * from the provided checkpoint directory. This involves moving the 
checkpoint
+   * path to a versioned directory, updating the snapshot metadata, and 
committing
+   * the changes to persist the snapshot version update.
+   *
+   * @param snapshotId The UUID identifying the snapshot to update.
+   * @param checkpointPath The path to the checkpoint directory that serves as 
the basis
+   *                       for the updated snapshot version.
+   * @return The previous version number of the snapshot prior to the update.
+   * @throws IOException If an I/O error occurs during file operations, 
checkpoint processing,
+   *                     or snapshot metadata updates.
+   */
+  @VisibleForTesting
+  int atomicSwitchSnapshotDB(UUID snapshotId, Path checkpointPath) throws 
IOException {
+    try (WritableOmSnapshotLocalDataProvider snapshotLocalDataProvider =
+             
snapshotLocalDataManager.getWritableOmSnapshotLocalData(snapshotId)) {
+      OmSnapshotLocalData localData = 
snapshotLocalDataProvider.getSnapshotLocalData();
+      Path nextVersionPath = 
OmSnapshotManager.getSnapshotPath(ozoneManager.getMetadataManager(), snapshotId,
+          localData.getVersion() + 1);
+      // Remove the directory if it exists.
+      if (nextVersionPath.toFile().exists()) {
+        deleteDirectory(nextVersionPath);
+      }
+      // Move the checkpoint directory to the next version directory.
+      Files.move(checkpointPath, nextVersionPath);
+      RocksDBCheckpoint dbCheckpoint = new RocksDBCheckpoint(nextVersionPath);
+      // Add a new version to the local data file.
+      try (OmMetadataManagerImpl newVersionCheckpointMetadataManager =
+               OmMetadataManagerImpl.createCheckpointMetadataManager(conf, 
dbCheckpoint, true)) {
+        RDBStore newVersionCheckpointStore = (RDBStore) 
newVersionCheckpointMetadataManager.getStore();
+        
snapshotLocalDataProvider.addSnapshotVersion(newVersionCheckpointStore);
+        snapshotLocalDataProvider.commit();
+      }
+      return localData.getVersion() - 1;
+    }
   }
 
   private final class SnapshotDefragTask implements BackgroundTask {
@@ -192,6 +523,105 @@ public BackgroundTaskResult call() throws Exception {
     }
   }
 
+  /**
+   * Creates a new checkpoint by modifying the metadata manager from a 
snapshot.
+   * This involves generating a temporary checkpoint and truncating specified
+   * column families from the checkpoint before returning the updated metadata 
manager.
+   *
+   * @param snapshotInfo Information about the snapshot for which the 
checkpoint
+   *                     is being created.
+   * @param incrementalColumnFamilies A set of table names representing 
incremental
+   *                                   column families to be retained in the 
checkpoint.
+   * @return A new instance of OmMetadataManagerImpl initialized with the 
modified
+   *         checkpoint.
+   * @throws IOException If an I/O error occurs during snapshot processing,
+   *                     checkpoint creation, or table operations.
+   */
+  @VisibleForTesting
+  OmMetadataManagerImpl createCheckpoint(SnapshotInfo snapshotInfo,
+      Set<String> incrementalColumnFamilies) throws IOException {
+    try (UncheckedAutoCloseableSupplier<OmSnapshot> snapshot = 
omSnapshotManager.getActiveSnapshot(
+        snapshotInfo.getVolumeName(), snapshotInfo.getBucketName(), 
snapshotInfo.getName())) {
+      DBCheckpoint checkpoint = 
snapshot.get().getMetadataManager().getStore().getCheckpoint(tmpDefragDir, 
true);
+      try (OmMetadataManagerImpl metadataManagerBeforeTruncate =
+               OmMetadataManagerImpl.createCheckpointMetadataManager(conf, 
checkpoint, false)) {
+        DBStore dbStore = metadataManagerBeforeTruncate.getStore();
+        for (String table : metadataManagerBeforeTruncate.listTableNames()) {
+          if (!incrementalColumnFamilies.contains(table)) {
+            dbStore.dropTable(table);
+          }
+        }
+      } catch (Exception e) {
+        throw new IOException("Failed to close checkpoint of snapshot: " + 
snapshotInfo.getSnapshotId(), e);
+      }
+      // This will recreate the column families in the checkpoint.
+      return OmMetadataManagerImpl.createCheckpointMetadataManager(conf, 
checkpoint, false);
+    }
+  }
+
+  private void acquireContentLock(UUID snapshotID) throws IOException {
+    lockIds.clear();
+    lockIds.add(snapshotID);
+    OMLockDetails lockDetails = snapshotContentLocks.acquireLock(lockIds);
+    if (!lockDetails.isLockAcquired()) {
+      throw new IOException("Failed to acquire lock on snapshot: " + 
snapshotID);
+    }
+    LOG.debug("Acquired MultiSnapshotLocks on snapshot: {}", snapshotID);
+  }
+
+  private boolean checkAndDefragSnapshot(SnapshotChainManager chainManager, 
UUID snapshotId) throws IOException {
+    SnapshotInfo snapshotInfo = SnapshotUtils.getSnapshotInfo(ozoneManager, 
chainManager, snapshotId);
+
+    if (snapshotInfo.getSnapshotStatus() != 
SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE) {
+      LOG.debug("Skipping defragmentation for non-active snapshot: {} (ID: 
{})",
+          snapshotInfo.getName(), snapshotInfo.getSnapshotId());
+      return false;
+    }
+    Pair<Boolean, Integer> needsDefragVersionPair = 
needsDefragmentation(snapshotInfo);
+    if (!needsDefragVersionPair.getLeft()) {
+      return false;
+    }
+    // Create a checkpoint of the previous snapshot or the current snapshot if 
it is the first snapshot in the chain.
+    SnapshotInfo checkpointSnapshotInfo = 
snapshotInfo.getPathPreviousSnapshotId() == null ? snapshotInfo :
+        SnapshotUtils.getSnapshotInfo(ozoneManager, chainManager, 
snapshotInfo.getPathPreviousSnapshotId());
+
+    OmMetadataManagerImpl checkpointMetadataManager = 
createCheckpoint(checkpointSnapshotInfo,
+        COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT);
+    Path checkpointLocation = 
checkpointMetadataManager.getStore().getDbLocation().toPath();
+    try {
+      DBStore checkpointDBStore = checkpointMetadataManager.getStore();
+      TablePrefixInfo prefixInfo = 
ozoneManager.getMetadataManager().getTableBucketPrefix(snapshotInfo.getVolumeName(),
+          snapshotInfo.getBucketName());
+      // If first snapshot in the chain perform full defragmentation.
+      if (snapshotInfo.getPathPreviousSnapshotId() == null) {
+        performFullDefragmentation(checkpointDBStore, prefixInfo, 
COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT);
+      } else {
+        performIncrementalDefragmentation(checkpointSnapshotInfo, 
snapshotInfo, needsDefragVersionPair.getValue(),
+            checkpointDBStore, prefixInfo, 
COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT);
+      }
+      int previousVersion;
+      // Acquire Content lock on the snapshot to ensure the contents of the 
table doesn't get changed.
+      acquireContentLock(snapshotId);
+      try {
+        // Ingestion of incremental tables KeyTable/FileTable/DirectoryTable 
done now we need to just reingest the
+        // remaining tables from the original snapshot.
+        ingestNonIncrementalTables(checkpointDBStore, snapshotInfo, 
prefixInfo, COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT);
+        checkpointMetadataManager.close();
+        checkpointMetadataManager = null;
+        // Switch the snapshot DB location to the new version.
+        previousVersion  = atomicSwitchSnapshotDB(snapshotId, 
checkpointLocation);
+      } finally {
+        snapshotContentLocks.releaseLock();
+      }
+      omSnapshotManager.deleteSnapshotCheckpointDirectories(snapshotId, 
previousVersion);
+    } finally {
+      if (checkpointMetadataManager != null) {
+        checkpointMetadataManager.close();
+      }
+    }
+    return true;
+  }
+
   public synchronized boolean triggerSnapshotDefragOnce() throws IOException {
 
     final long count = runCount.incrementAndGet();
@@ -217,119 +647,26 @@ public synchronized boolean triggerSnapshotDefragOnce() 
throws IOException {
     final SnapshotChainManager snapshotChainManager =
         ((OmMetadataManagerImpl) 
ozoneManager.getMetadataManager()).getSnapshotChainManager();
 
-    final Table<String, SnapshotInfo> snapshotInfoTable =
-        ozoneManager.getMetadataManager().getSnapshotInfoTable();
-
     // Use iterator(false) to iterate forward through the snapshot chain
     Iterator<UUID> snapshotIterator = snapshotChainManager.iterator(false);
 
     long snapshotLimit = snapshotLimitPerTask;
-
     while (snapshotLimit > 0 && running.get() && snapshotIterator.hasNext()) {
-      // Get SnapshotInfo for the current snapshot in the chain
+      // Get SnapshotInfo for the current snapshot in the chain.
       UUID snapshotId = snapshotIterator.next();
-      String snapshotTableKey = snapshotChainManager.getTableKey(snapshotId);
-      SnapshotInfo snapshotToDefrag = snapshotInfoTable.get(snapshotTableKey);
-      if (snapshotToDefrag == null) {
-        LOG.warn("Snapshot with ID '{}' not found in snapshot info table", 
snapshotId);
-        continue;
-      }
-
-      // Skip deleted snapshots
-      if (snapshotToDefrag.getSnapshotStatus() == 
SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED) {
-        LOG.debug("Skipping deleted snapshot: {} (ID: {})",
-            snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
-        continue;
-      }
-
-      // Check if this snapshot needs defragmentation
-      if (!needsDefragmentation(snapshotToDefrag)) {
-        LOG.debug("Skipping already defragged snapshot: {} (ID: {})",
-            snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
-        continue;
-      }
-
-      LOG.info("Will defrag snapshot: {} (ID: {})",
-          snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
-
-      // Acquire MultiSnapshotLocks
-      if 
(!snapshotIdLocks.acquireLock(Collections.singletonList(snapshotToDefrag.getSnapshotId()))
-          .isLockAcquired()) {
-        LOG.error("Abort. Failed to acquire lock on snapshot: {} (ID: {})",
-            snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
-        break;
-      }
-
-      try {
-        LOG.info("Processing snapshot defragmentation for: {} (ID: {})",
-            snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
-
-        // Get snapshot through SnapshotCache for proper locking
-        try (UncheckedAutoCloseableSupplier<OmSnapshot> snapshotSupplier =
-                 
snapshotManager.get().getSnapshot(snapshotToDefrag.getSnapshotId())) {
-
-          OmSnapshot omSnapshot = snapshotSupplier.get();
-
-          UUID pathPreviousSnapshotId = 
snapshotToDefrag.getPathPreviousSnapshotId();
-          boolean isFirstSnapshotInPath = pathPreviousSnapshotId == null;
-          if (isFirstSnapshotInPath) {
-            LOG.info("Performing full defragmentation for first snapshot (in 
path): {}",
-                snapshotToDefrag.getName());
-            performFullDefragmentation(snapshotToDefrag, omSnapshot);
-          } else {
-            final String psIdtableKey = 
snapshotChainManager.getTableKey(pathPreviousSnapshotId);
-            SnapshotInfo previousDefraggedSnapshot = 
snapshotInfoTable.get(psIdtableKey);
-
-            LOG.info("Performing incremental defragmentation for snapshot: {} 
" +
-                    "based on previous defragmented snapshot: {}",
-                snapshotToDefrag.getName(), 
previousDefraggedSnapshot.getName());
-
-            // If previous path snapshot is not null, it must have been 
defragmented already
-            // Sanity check to ensure previous snapshot exists and is 
defragmented
-            if (needsDefragmentation(previousDefraggedSnapshot)) {
-              LOG.error("Fatal error before defragging snapshot: {}. " +
-                      "Previous snapshot in path {} was not defragged while it 
is expected to be.",
-                  snapshotToDefrag.getName(), 
previousDefraggedSnapshot.getName());
-              break;
-            }
-
-            performIncrementalDefragmentation(snapshotToDefrag,
-                previousDefraggedSnapshot, omSnapshot);
-          }
-
-          // TODO: Update snapshot metadata here?
-
-          // Close and evict the original snapshot DB from SnapshotCache
-          // TODO: Implement proper eviction from SnapshotCache
-          LOG.info("Defragmentation completed for snapshot: {}",
-              snapshotToDefrag.getName());
-
+      try (UncheckedAutoCloseable lock = 
getBootstrapStateLock().acquireReadLock()) {
+        if (checkAndDefragSnapshot(snapshotChainManager, snapshotId)) {
           snapshotLimit--;
           snapshotsDefraggedCount.getAndIncrement();
-
-        } catch (OMException ome) {
-          if (ome.getResult() == OMException.ResultCodes.FILE_NOT_FOUND) {
-            LOG.info("Snapshot {} was deleted during defragmentation",
-                snapshotToDefrag.getName());
-          } else {
-            LOG.error("OMException during snapshot defragmentation for: {}",
-                snapshotToDefrag.getName(), ome);
-          }
         }
-
-      } catch (Exception e) {
-        LOG.error("Exception during snapshot defragmentation for: {}",
-            snapshotToDefrag.getName(), e);
+      } catch (IOException e) {
+        LOG.error("Exception while defragmenting snapshot: {}", snapshotId, e);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOG.error("Snapshot defragmentation task interrupted", e);
         return false;
-      } finally {
-        // Release lock MultiSnapshotLocks
-        snapshotIdLocks.releaseLock();
-        LOG.debug("Released MultiSnapshotLocks on snapshot: {} (ID: {})",
-            snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
-
       }
     }
-
     return true;
   }
 
@@ -371,6 +708,18 @@ public BootstrapStateHandler.Lock getBootstrapStateLock() {
   public void shutdown() {
     running.set(false);
     super.shutdown();
+    try {
+      deltaDiffComputer.close();
+    } catch (IOException e) {
+      LOG.error("Error while closing delta diff computer.", e);
+    }
+    Path tmpDirPath =  Paths.get(tmpDefragDir);
+    if (tmpDirPath.toFile().exists()) {
+      try {
+        deleteDirectory(tmpDirPath);
+      } catch (IOException e) {
+        LOG.error("Failed to delete temporary directory: {}", tmpDirPath, e);
+      }
+    }
   }
 }
-
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestSnapshotDefragService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestSnapshotDefragService.java
new file mode 100644
index 00000000000..46fa02d21f3
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestSnapshotDefragService.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot.defrag;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OmSnapshotLocalData;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
+import 
org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager.WritableOmSnapshotLocalDataProvider;
+import 
org.apache.hadoop.ozone.om.snapshot.diff.delta.CompositeDeltaDiffComputer;
+import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
+import org.apache.hadoop.ozone.upgrade.LayoutFeature;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.MockedConstruction;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Unit tests for SnapshotDefragService.
+ */
+public class TestSnapshotDefragService {
+
+  @Mock
+  private OzoneManager ozoneManager;
+
+  @Mock
+  private OmSnapshotManager omSnapshotManager;
+
+  @Mock
+  private OmSnapshotLocalDataManager snapshotLocalDataManager;
+
+  @Mock
+  private OmMetadataManagerImpl metadataManager;
+
+  @Mock
+  private IOzoneManagerLock omLock;
+
+  @Mock
+  private OMLayoutVersionManager versionManager;
+
+  @TempDir
+  private Path tempDir;
+  private SnapshotDefragService defragService;
+  private AutoCloseable mocks;
+
+  @BeforeEach
+  public void setup() throws IOException {
+    mocks = MockitoAnnotations.openMocks(this);
+    OzoneConfiguration configuration = new OzoneConfiguration();
+
+    // Setup basic mocks
+    when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager);
+    when(ozoneManager.getMetadataManager()).thenReturn(metadataManager);
+    when(ozoneManager.getThreadNamePrefix()).thenReturn("TestOM");
+    when(ozoneManager.isRunning()).thenReturn(true);
+    when(ozoneManager.getVersionManager()).thenReturn(versionManager);
+    
when(ozoneManager.getOmRatisServer()).thenReturn(mock(OzoneManagerRatisServer.class));
+
+    
when(omSnapshotManager.getSnapshotLocalDataManager()).thenReturn(snapshotLocalDataManager);
+    when(metadataManager.getLock()).thenReturn(omLock);
+    when(metadataManager.getSnapshotParentDir()).thenReturn(tempDir);
+    when(versionManager.isAllowed(any(LayoutFeature.class))).thenReturn(true);
+    try (MockedConstruction<CompositeDeltaDiffComputer> 
compositeDeltaDiffComputer =
+             mockConstruction(CompositeDeltaDiffComputer.class)) {
+      // Initialize service
+      defragService = new SnapshotDefragService(
+          10000, // interval
+          TimeUnit.MILLISECONDS,
+          60000, // timeout
+          ozoneManager,
+          configuration
+      );
+      assertEquals(1, compositeDeltaDiffComputer.constructed().size());
+    }
+
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    if (defragService != null) {
+      defragService.shutdown();
+    }
+    if (mocks != null) {
+      mocks.close();
+    }
+  }
+
+  @Test
+  public void testServiceStartAndPause() {
+    defragService.start();
+    assertTrue(defragService.getSnapshotsDefraggedCount().get() >= 0);
+
+    defragService.pause();
+    assertNotNull(defragService);
+
+    defragService.resume();
+    assertNotNull(defragService);
+  }
+
+  @Test
+  public void testNeedsDefragmentationAlreadyDefragmented() throws IOException 
{
+    UUID snapshotId = UUID.randomUUID();
+    SnapshotInfo snapshotInfo = createMockSnapshotInfo(snapshotId, "vol1", 
"bucket1", "snap1");
+
+    WritableOmSnapshotLocalDataProvider provider = 
mock(WritableOmSnapshotLocalDataProvider.class);
+    OmSnapshotLocalData localData = mock(OmSnapshotLocalData.class);
+    OmSnapshotLocalData previousLocalData = mock(OmSnapshotLocalData.class);
+
+    
when(snapshotLocalDataManager.getWritableOmSnapshotLocalData(snapshotInfo)).thenReturn(provider);
+    when(provider.needsDefrag()).thenReturn(false);
+    when(provider.getSnapshotLocalData()).thenReturn(localData);
+    
when(provider.getPreviousSnapshotLocalData()).thenReturn(previousLocalData);
+    when(localData.getVersion()).thenReturn(1);
+    when(previousLocalData.getVersion()).thenReturn(0);
+
+
+    OmSnapshotLocalData.VersionMeta versionInfo = 
mock(OmSnapshotLocalData.VersionMeta.class);
+    when(versionInfo.getPreviousSnapshotVersion()).thenReturn(0);
+    Map<Integer, OmSnapshotLocalData.VersionMeta> versionMap = 
ImmutableMap.of(1, versionInfo);
+    when(localData.getVersionSstFileInfos()).thenReturn(versionMap);
+
+    Pair<Boolean, Integer> result = 
defragService.needsDefragmentation(snapshotInfo);
+
+    assertFalse(result.getLeft());
+    assertEquals(1, result.getRight());
+    verify(provider).commit();
+    verify(provider).close();
+  }
+
+  @Test
+  public void testNeedsDefragmentationRequiresDefrag() throws IOException {
+    UUID snapshotId = UUID.randomUUID();
+    SnapshotInfo snapshotInfo = createMockSnapshotInfo(snapshotId, "vol1", 
"bucket1", "snap1");
+
+    WritableOmSnapshotLocalDataProvider provider = 
mock(WritableOmSnapshotLocalDataProvider.class);
+    OmSnapshotLocalData localData = mock(OmSnapshotLocalData.class);
+    AtomicInteger commit = new AtomicInteger(0);
+    
when(snapshotLocalDataManager.getWritableOmSnapshotLocalData(snapshotInfo)).thenReturn(provider);
+    when(provider.getSnapshotLocalData()).thenReturn(localData);
+    doAnswer(invocationOnMock -> {
+      commit.incrementAndGet();
+      return null;
+    }).when(provider).commit();
+    when(provider.needsDefrag()).thenAnswer(i -> commit.get() == 1);
+    int version = ThreadLocalRandom.current().nextInt(100);
+    when(localData.getVersion()).thenReturn(version);
+
+    Pair<Boolean, Integer> result = 
defragService.needsDefragmentation(snapshotInfo);
+
+    assertTrue(result.getLeft());
+    assertEquals(version, result.getRight());
+    verify(provider).close();
+  }
+
+  /**
+   * Helper method to create a mock SnapshotInfo.
+   */
+  private SnapshotInfo createMockSnapshotInfo(UUID snapshotId, String volume,
+                                                String bucket, String name) {
+    SnapshotInfo.Builder builder = SnapshotInfo.newBuilder();
+    builder.setSnapshotId(snapshotId);
+    builder.setVolumeName(volume);
+    builder.setBucketName(bucket);
+    builder.setName(name);
+    builder.setSnapshotStatus(SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE);
+    builder.setCreationTime(System.currentTimeMillis());
+    return builder.build();
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/TestRDBDifferComputer.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/TestRDBDifferComputer.java
index 19579a59e16..6ff77765618 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/TestRDBDifferComputer.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/TestRDBDifferComputer.java
@@ -527,3 +527,5 @@ private OmSnapshotLocalData 
createMockSnapshotLocalDataWithVersions(UUID snapsho
     return localData;
   }
 }
+
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to