This is an automated email from the ASF dual-hosted git repository.

prashantpogde pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 98abce2e62 HDDS-8315. [Snapshot] Added unit tests for 
SnapshotDiffManager (#4716)
98abce2e62 is described below

commit 98abce2e62396fcae554015d483d9d2c0fe0bb05
Author: Hemant Kumar <[email protected]>
AuthorDate: Thu Jun 22 18:57:46 2023 -0700

    HDDS-8315. [Snapshot] Added unit tests for SnapshotDiffManager (#4716)
---
 .../org/apache/hadoop/ozone/om/TestOmSnapshot.java |    2 +-
 .../apache/hadoop/ozone/om/OmSnapshotManager.java  |    8 +-
 .../ozone/om/snapshot/SnapshotDiffManager.java     |  213 +--
 .../ozone/om/snapshot/TestSnapshotDiffManager.java | 1479 ++++++++++++--------
 4 files changed, 1058 insertions(+), 644 deletions(-)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
index 4195fa0994..2726ecc506 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
@@ -218,7 +218,7 @@ public class TestOmSnapshot {
 
     // stop the deletion services so that keys can still be read
     keyManager.stop();
-//    preFinalizationChecks();
+    preFinalizationChecks();
     finalizeOMUpgrade();
     counter = new AtomicInteger();
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index a1dbd2eaa7..c2d662eed5 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -59,6 +59,8 @@ import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.request.key.OMDirectoriesPurgeRequestWithFSO;
+import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest;
 import org.apache.hadoop.ozone.om.service.SnapshotDiffCleanupService;
 import org.apache.hadoop.ozone.om.snapshot.SnapshotDiffObject;
 import org.apache.hadoop.ozone.om.helpers.SnapshotDiffJob;
@@ -125,7 +127,7 @@ public final class OmSnapshotManager implements 
AutoCloseable {
    * |  fromSnapshotId-toSnapshotId | SnapshotDiffJob |
    * |------------------------------------------------|
    */
-  private static final String SNAP_DIFF_JOB_TABLE_NAME =
+  public static final String SNAP_DIFF_JOB_TABLE_NAME =
       "snap-diff-job-table";
 
   /**
@@ -137,7 +139,7 @@ public final class OmSnapshotManager implements 
AutoCloseable {
    * |  jobId-index | DiffReportEntry |
    * |--------------------------------|
    */
-  private static final String SNAP_DIFF_REPORT_TABLE_NAME =
+  public static final String SNAP_DIFF_REPORT_TABLE_NAME =
       "snap-diff-report-table";
 
   /**
@@ -359,7 +361,7 @@ public final class OmSnapshotManager implements 
AutoCloseable {
     };
   }
 
-  private CodecRegistry createCodecRegistryForSnapDiff() {
+  private static CodecRegistry createCodecRegistryForSnapDiff() {
     final CodecRegistry.Builder registry = CodecRegistry.newBuilder();
     // DiffReportEntry codec for Diff Report.
     registry.addCodec(SnapshotDiffReportOzone.DiffReportEntry.class,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
index d4a759e2cd..ac70c44d56 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
@@ -22,47 +22,21 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.commons.io.file.PathUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-import org.apache.commons.io.file.PathUtils;
-import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.utils.db.CodecRegistry;
 import org.apache.hadoop.hdds.utils.db.RDBStore;
 import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.ozone.OFSPath;
-import org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpTool;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpTool;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
+import org.apache.hadoop.ozone.OFSPath;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -75,14 +49,14 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotDiffJob;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.helpers.WithObjectID;
+import org.apache.hadoop.ozone.om.helpers.WithParentObjectId;
 import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
 import 
org.apache.hadoop.ozone.om.snapshot.SnapshotDiffObject.SnapshotDiffObjectBuilder;
-import org.apache.hadoop.ozone.om.helpers.WithParentObjectId;
 import org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone;
-import org.apache.hadoop.util.ClosableIterator;
 import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse;
-import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus;
 import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobCancelResult;
+import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus;
+import org.apache.hadoop.util.ClosableIterator;
 import org.apache.ozone.rocksdb.util.ManagedSstFileReader;
 import org.apache.ozone.rocksdb.util.RdbUtil;
 import org.apache.ozone.rocksdiff.DifferSnapshotInfo;
@@ -95,8 +69,38 @@ import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
+import static 
org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.CREATE;
+import static 
org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.DELETE;
+import static 
org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.MODIFY;
+import static 
org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.RENAME;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_JOB_DEFAULT_WAIT_TIME;
@@ -107,6 +111,9 @@ import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_THR
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_THREAD_POOL_SIZE_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF_DEFAULT;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DIRECTORY_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
 import static org.apache.hadoop.ozone.om.OmSnapshotManager.DELIMITER;
 import static org.apache.hadoop.ozone.om.helpers.SnapshotInfo.getTableKey;
 import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.checkSnapshotActive;
@@ -119,9 +126,6 @@ import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.IN
 import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.QUEUED;
 import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.REJECTED;
 
-import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
-import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
-
 /**
  * Class to generate snapshot diff.
  */
@@ -173,7 +177,7 @@ public class SnapshotDiffManager implements AutoCloseable {
   private final boolean snapshotForceFullDiff;
   private final Optional<ManagedSSTDumpTool> sstDumpTool;
 
-  private Optional<ExecutorService> sstDumptoolExecService;
+  private Optional<ExecutorService> sstDumpToolExecService;
 
   @SuppressWarnings("parameternumber")
   public SnapshotDiffManager(ManagedRocksDB db,
@@ -274,16 +278,16 @@ public class SnapshotDiffManager implements AutoCloseable 
{
           OMConfigKeys
               .OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE_DEFAULT,
               StorageUnit.BYTES);
-      this.sstDumptoolExecService = Optional.of(new ThreadPoolExecutor(0,
+      this.sstDumpToolExecService = Optional.of(new ThreadPoolExecutor(0,
               threadPoolSize, 60, TimeUnit.SECONDS,
               new SynchronousQueue<>(), new ThreadFactoryBuilder()
               .setNameFormat("snapshot-diff-manager-sst-dump-tool-TID-%d")
               .build(),
               new ThreadPoolExecutor.DiscardPolicy()));
-      return Optional.of(new ManagedSSTDumpTool(sstDumptoolExecService.get(),
+      return Optional.of(new ManagedSSTDumpTool(sstDumpToolExecService.get(),
           bufferSize));
     } catch (NativeLibraryNotLoadedException e) {
-      this.sstDumptoolExecService.ifPresent(exec ->
+      this.sstDumpToolExecService.ifPresent(exec ->
           closeExecutorService(exec, "SstDumpToolExecutor"));
     }
     return Optional.empty();
@@ -346,12 +350,12 @@ public class SnapshotDiffManager implements AutoCloseable 
{
     String volumeId = 
String.valueOf(omMetadataManager.getVolumeId(volumeName));
     String bucketId = String.valueOf(
         omMetadataManager.getBucketId(volumeName, bucketName));
-    tablePrefixes.put(OmMetadataManagerImpl.KEY_TABLE,
-        OM_KEY_PREFIX + volumeName + OM_KEY_PREFIX + bucketName
-        + OM_KEY_PREFIX);
-    tablePrefixes.put(OmMetadataManagerImpl.FILE_TABLE,
+    tablePrefixes.put(KEY_TABLE,
+        OM_KEY_PREFIX + volumeName + OM_KEY_PREFIX + bucketName +
+            OM_KEY_PREFIX);
+    tablePrefixes.put(FILE_TABLE,
         OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId + OM_KEY_PREFIX);
-    tablePrefixes.put(OmMetadataManagerImpl.DIRECTORY_TABLE,
+    tablePrefixes.put(DIRECTORY_TABLE,
         OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId + OM_KEY_PREFIX);
     return tablePrefixes;
   }
@@ -376,8 +380,10 @@ public class SnapshotDiffManager implements AutoCloseable {
         getTablePrefixes(snapshotOMMM, volumeName, bucketName));
   }
 
-  private Set<String> getSSTFileListForSnapshot(OmSnapshot snapshot,
-          List<String> tablesToLookUp) throws RocksDBException {
+  @VisibleForTesting
+  protected Set<String> getSSTFileListForSnapshot(OmSnapshot snapshot,
+                                                  List<String> tablesToLookUp)
+      throws RocksDBException {
     return RdbUtil.getSSTFilesForComparison(snapshot
         .getMetadataManager().getStore().getDbLocation()
         .getPath(), tablesToLookUp);
@@ -556,20 +562,40 @@ public class SnapshotDiffManager implements AutoCloseable 
{
   }
 
   @VisibleForTesting
-  SnapshotDiffReportOzone createPageResponse(final SnapshotDiffJob snapDiffJob,
-      final String volumeName, final String bucketName,
-      final String fromSnapshotName, final String toSnapshotName,
-      final int index, final int pageSize) throws IOException {
+  SnapshotDiffReportOzone createPageResponse(
+      final SnapshotDiffJob snapDiffJob,
+      final String volumeName,
+      final String bucketName,
+      final String fromSnapshotName,
+      final String toSnapshotName,
+      final int index,
+      final int pageSize
+  ) throws IOException {
     if (index < 0 || pageSize <= 0) {
       throw new IllegalArgumentException(String.format(
           "Index should be a number >= 0. Given index %d. Page size " +
-          "should be a positive number > 0. Given page size is %d",
+              "should be a positive number > 0. Given page size is %d",
           index, pageSize));
     }
-    List<DiffReportEntry> diffReportList = new ArrayList<>();
 
     OFSPath path = getSnapshotRootPath(volumeName, bucketName);
 
+    Pair<List<DiffReportEntry>, String> pageResponse =
+        createPageResponse(snapDiffJob, index, pageSize);
+    List<DiffReportEntry> diffReportList = pageResponse.getLeft();
+    String tokenString = pageResponse.getRight();
+
+    return new SnapshotDiffReportOzone(path.toString(), volumeName, bucketName,
+        fromSnapshotName, toSnapshotName, diffReportList, tokenString);
+  }
+
+  Pair<List<DiffReportEntry>, String> createPageResponse(
+      final SnapshotDiffJob snapDiffJob,
+      final int index,
+      final int pageSize
+  ) throws IOException {
+    List<DiffReportEntry> diffReportList = new ArrayList<>();
+
     boolean hasMoreEntries = true;
 
     byte[] lowerIndex = codecRegistry.asRawData(getReportKeyForIndex(
@@ -597,9 +623,7 @@ public class SnapshotDiffManager implements AutoCloseable {
     String nextTokenString = hasMoreEntries ? String.valueOf(idx) : null;
 
     checkReportsIntegrity(snapDiffJob, index, diffReportList.size());
-
-    return new SnapshotDiffReportOzone(path.toString(), volumeName, bucketName,
-        fromSnapshotName, toSnapshotName, diffReportList, nextTokenString);
+    return Pair.of(diffReportList, nextTokenString);
   }
 
   /**
@@ -608,9 +632,10 @@ public class SnapshotDiffManager implements AutoCloseable {
    * If check fails, it marks the job failed so that it is GC-ed by clean up
    * service and throws the exception to client.
    */
-  private void checkReportsIntegrity(final SnapshotDiffJob diffJob,
-                                     final int pageStartIdx,
-                                     final int numberOfEntriesInPage)
+  @VisibleForTesting
+  void checkReportsIntegrity(final SnapshotDiffJob diffJob,
+                             final int pageStartIdx,
+                             final int numberOfEntriesInPage)
       throws IOException {
     if ((pageStartIdx >= diffJob.getTotalDiffEntries() &&
         numberOfEntriesInPage != 0) || (pageStartIdx <
@@ -757,7 +782,8 @@ public class SnapshotDiffManager implements AutoCloseable {
     return snapDiffJob;
   }
 
-  private boolean areDiffJobAndSnapshotsActive(
+  @VisibleForTesting
+  boolean areDiffJobAndSnapshotsActive(
       final String volumeName, final String bucketName,
       final String fromSnapshotName, final String toSnapshotName)
       throws IOException {
@@ -780,13 +806,14 @@ public class SnapshotDiffManager implements AutoCloseable 
{
   }
 
   @SuppressWarnings("methodlength")
-  private void generateSnapshotDiffReport(final String jobKey,
-                                          final String jobId,
-                                          final String volumeName,
-                                          final String bucketName,
-                                          final String fromSnapshotName,
-                                          final String toSnapshotName,
-                                          final boolean forceFullDiff) {
+  @VisibleForTesting
+  void generateSnapshotDiffReport(final String jobKey,
+                                  final String jobId,
+                                  final String volumeName,
+                                  final String bucketName,
+                                  final String fromSnapshotName,
+                                  final String toSnapshotName,
+                                  final boolean forceFullDiff) {
     LOG.info("Started snap diff report generation for volume: {} " +
             "bucket: {}, fromSnapshot: {} and toSnapshot: {}",
         volumeName, bucketName, fromSnapshotName, toSnapshotName);
@@ -1043,13 +1070,14 @@ public class SnapshotDiffManager implements 
AutoCloseable {
     }
   }
 
+  @VisibleForTesting
   @SuppressWarnings("checkstyle:ParameterNumber")
   void addToObjectIdMap(Table<String, ? extends WithParentObjectId> fsTable,
       Table<String, ? extends WithParentObjectId> tsTable,
       Set<String> deltaFiles, boolean nativeRocksToolsLoaded,
       PersistentMap<byte[], byte[]> oldObjIdToKeyMap,
       PersistentMap<byte[], byte[]> newObjIdToKeyMap,
-      final PersistentMap<byte[], SnapshotDiffObject> objectIdToDiffObject,
+      PersistentMap<byte[], SnapshotDiffObject> objectIdToDiffObject,
       Optional<Set<Long>> oldParentIds,
       Optional<Set<Long>> newParentIds,
       Map<String, String> tablePrefixes) throws IOException,
@@ -1059,7 +1087,7 @@ public class SnapshotDiffManager implements AutoCloseable 
{
     }
     String tablePrefix = getTablePrefix(tablePrefixes, fsTable.getName());
     boolean isDirectoryTable =
-        fsTable.getName().equals(OmMetadataManagerImpl.DIRECTORY_TABLE);
+        fsTable.getName().equals(DIRECTORY_TABLE);
     ManagedSstFileReader sstFileReader = new ManagedSstFileReader(deltaFiles);
     validateEstimatedKeyChangesAreInLimits(sstFileReader);
 
@@ -1147,11 +1175,16 @@ public class SnapshotDiffManager implements 
AutoCloseable {
     return keyInfo.getKeyName();
   }
 
+  @VisibleForTesting
   @SuppressWarnings("checkstyle:ParameterNumber")
-  Set<String> getDeltaFiles(OmSnapshot fromSnapshot, OmSnapshot toSnapshot,
-                            List<String> tablesToLookUp, SnapshotInfo fsInfo,
-                            SnapshotInfo tsInfo, boolean useFullDiff,
-                            Map<String, String> tablePrefixes, String diffDir)
+  Set<String> getDeltaFiles(OmSnapshot fromSnapshot,
+                            OmSnapshot toSnapshot,
+                            List<String> tablesToLookUp,
+                            SnapshotInfo fsInfo,
+                            SnapshotInfo tsInfo,
+                            boolean useFullDiff,
+                            Map<String, String> tablePrefixes,
+                            String diffDir)
       throws RocksDBException, IOException {
     // TODO: [SNAPSHOT] Refactor the parameter list
     final Set<String> deltaFiles = new HashSet<>();
@@ -1316,22 +1349,19 @@ public class SnapshotDiffManager implements 
AutoCloseable {
             String key = resolveAbsolutePath(isFSOBucket, newParentIdPathMap,
                 newKeyName);
             DiffReportEntry entry =
-                SnapshotDiffReportOzone.getDiffReportEntry(DiffType.CREATE,
-                    key);
+                SnapshotDiffReportOzone.getDiffReportEntry(CREATE, key);
             createDiffs.add(codecRegistry.asRawData(entry));
           } else if (newKeyName == null) { // Key Deleted.
             String key = resolveAbsolutePath(isFSOBucket, oldParentIdPathMap,
                 oldKeyName);
             DiffReportEntry entry =
-                SnapshotDiffReportOzone.getDiffReportEntry(DiffType.DELETE,
-                    key);
+                SnapshotDiffReportOzone.getDiffReportEntry(DELETE, key);
             deleteDiffs.add(codecRegistry.asRawData(entry));
           } else if (Arrays.equals(oldKeyName, newKeyName)) { // Key modified.
             String key = resolveAbsolutePath(isFSOBucket, newParentIdPathMap,
                 newKeyName);
             DiffReportEntry entry =
-                SnapshotDiffReportOzone.getDiffReportEntry(DiffType.MODIFY,
-                    key);
+                SnapshotDiffReportOzone.getDiffReportEntry(MODIFY, key);
             modifyDiffs.add(codecRegistry.asRawData(entry));
           } else { // Key Renamed.
             String oldKey = resolveAbsolutePath(isFSOBucket, 
oldParentIdPathMap,
@@ -1339,8 +1369,8 @@ public class SnapshotDiffManager implements AutoCloseable 
{
             String newKey = resolveAbsolutePath(isFSOBucket, 
newParentIdPathMap,
                 newKeyName);
             renameDiffs.add(codecRegistry.asRawData(
-                SnapshotDiffReportOzone.getDiffReportEntry(DiffType.RENAME,
-                    oldKey, newKey)));
+                SnapshotDiffReportOzone.getDiffReportEntry(RENAME, oldKey,
+                    newKey)));
 
             // Check if block location is same or not. If it is not same,
             // key must have been overridden as well.
@@ -1349,8 +1379,7 @@ public class SnapshotDiffManager implements AutoCloseable 
{
               // Here, oldKey name is returned as modified. Modified key name 
is
               // based on base snapshot (from snapshot).
               renameDiffs.add(codecRegistry.asRawData(
-                  SnapshotDiffReportOzone.getDiffReportEntry(DiffType.MODIFY,
-                      oldKey)));
+                  SnapshotDiffReportOzone.getDiffReportEntry(MODIFY, oldKey)));
             }
           }
 
@@ -1503,9 +1532,10 @@ public class SnapshotDiffManager implements 
AutoCloseable {
     snapDiffJobTable.put(jobKey, snapshotDiffJob);
   }
 
-  private BucketLayout getBucketLayout(final String volume,
-                                       final String bucket,
-                                       final OMMetadataManager mManager)
+  @VisibleForTesting
+  protected BucketLayout getBucketLayout(final String volume,
+                                         final String bucket,
+                                         final OMMetadataManager mManager)
       throws IOException {
     final String bucketTableKey = mManager.getBucketKey(volume, bucket);
     return mManager.getBucketTable().get(bucketTableKey).getBucketLayout();
@@ -1541,7 +1571,7 @@ public class SnapshotDiffManager implements AutoCloseable 
{
    * check if the given key is in the bucket specified by tablePrefix map.
    */
   boolean isKeyInBucket(String key, Map<String, String> tablePrefixes,
-      String tableName) {
+                        String tableName) {
     return key.startsWith(getTablePrefix(tablePrefixes, tableName));
   }
 
@@ -1556,7 +1586,8 @@ public class SnapshotDiffManager implements AutoCloseable 
{
    * When client re-submits previously queued job, workflow will pick it and
    * execute it.
    */
-  private void loadJobsOnStartUp() {
+  @VisibleForTesting
+  void loadJobsOnStartUp() {
 
     try (ClosableIterator<Map.Entry<String, SnapshotDiffJob>> iterator =
              snapDiffJobTable.iterator()) {
@@ -1589,7 +1620,7 @@ public class SnapshotDiffManager implements AutoCloseable 
{
     if (snapDiffExecutor != null) {
       closeExecutorService(snapDiffExecutor, "SnapDiffExecutor");
     }
-    this.sstDumptoolExecService.ifPresent(exec ->
+    this.sstDumpToolExecService.ifPresent(exec ->
         closeExecutorService(exec, "SstDumpToolExecutor"));
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
index fa7d99e5a2..46f1773f7b 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
@@ -24,20 +24,24 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.HddsWhiteboxTestUtils;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException;
 import org.apache.hadoop.hdds.utils.db.CodecRegistry;
-import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.RDBStore;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpTool;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OmSnapshot;
@@ -48,29 +52,36 @@ import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotDiffJob;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
-import 
org.apache.hadoop.ozone.om.snapshot.SnapshotDiffObject.SnapshotDiffObjectBuilder;
 import org.apache.hadoop.ozone.om.helpers.WithParentObjectId;
+import 
org.apache.hadoop.ozone.om.snapshot.SnapshotDiffObject.SnapshotDiffObjectBuilder;
+import 
org.apache.hadoop.ozone.om.snapshot.SnapshotTestUtils.StubbedPersistentMap;
 import org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone;
 import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse;
-import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus;
 import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobCancelResult;
+import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ClosableIterator;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.ozone.rocksdb.util.ManagedSstFileReader;
 import org.apache.ozone.rocksdb.util.RdbUtil;
 import org.apache.ozone.rocksdiff.DifferSnapshotInfo;
 import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
 import org.apache.ozone.rocksdiff.RocksDiffUtils;
+import org.apache.ratis.util.ExitUtils;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
-import org.mockito.Matchers;
 import org.mockito.Mock;
 import org.mockito.MockedConstruction;
 import org.mockito.MockedStatic;
@@ -79,151 +90,315 @@ import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 import org.mockito.stubbing.Answer;
+import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ReadOptions;
-import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
-import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 import java.util.stream.Stream;
+
+import static 
org.apache.hadoop.hdds.utils.db.DBStoreBuilder.DEFAULT_COLUMN_FAMILY_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_JOB_DEFAULT_WAIT_TIME;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_JOB_DEFAULT_WAIT_TIME_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_MAX_ALLOWED_KEYS_CHANGED_PER_DIFF_JOB;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_MAX_ALLOWED_KEYS_CHANGED_PER_DIFF_JOB_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_THREAD_POOL_SIZE;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_THREAD_POOL_SIZE_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE_DEFAULT;
 import static org.apache.hadoop.ozone.om.OmSnapshotManager.DELIMITER;
+import static 
org.apache.hadoop.ozone.om.OmSnapshotManager.SNAP_DIFF_JOB_TABLE_NAME;
+import static 
org.apache.hadoop.ozone.om.OmSnapshotManager.SNAP_DIFF_REPORT_TABLE_NAME;
+import static org.apache.hadoop.ozone.om.helpers.BucketLayout.LEGACY;
+import static org.apache.hadoop.ozone.om.helpers.SnapshotInfo.getTableKey;
+import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone.getDiffReportEntryCodec;
+import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.DONE;
+import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.FAILED;
+import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.IN_PROGRESS;
+import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.QUEUED;
+import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.REJECTED;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.anySet;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.any;
 
 /**
- * Test class for SnapshotDiffManager Class.
+ * Tests for SnapshotDiffManager.
  */
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.LENIENT)
 public class TestSnapshotDiffManager {
-
-  @Mock
-  private ManagedRocksDB snapdiffDB;
-
+  private static final String VOLUME_NAME = "volume";
+  private static final String BUCKET_NAME = "bucket";
+  private ManagedRocksDB db;
+  private ManagedDBOptions dbOptions;
+  private ManagedColumnFamilyOptions columnFamilyOptions;
+  private List<ColumnFamilyHandle> columnFamilyHandles;
+  private ColumnFamilyHandle snapDiffJobTable;
+  private ColumnFamilyHandle snapDiffReportTable;
+  private SnapshotDiffManager snapshotDiffManager;
+  private final List<JobStatus> jobStatuses = Arrays.asList(QUEUED, 
IN_PROGRESS,
+      DONE, REJECTED, FAILED);
+
+  private SnapshotInfo snapshotInfo;
+  private final List<String> snapshotNames = new ArrayList<>();
+  private final List<SnapshotInfo> snapshotInfoList = new ArrayList<>();
+  private final List<SnapshotDiffJob> snapDiffJobs = new ArrayList<>();
+  @TempDir
+  private File dbDir;
   @Mock
   private RocksDBCheckpointDiffer differ;
-
+  @Mock
+  private OMMetadataManager omMetadataManager;
   @Mock
   private OzoneManager ozoneManager;
-
-  private LoadingCache<String, OmSnapshot> snapshotCache;
-
   @Mock
-  private ColumnFamilyHandle snapdiffJobCFH;
-
+  private OzoneConfiguration configuration;
   @Mock
-  private ColumnFamilyHandle snapdiffReportCFH;
-
+  private Table<String, SnapshotInfo> snapshotInfoTable;
   @Mock
-  private ManagedColumnFamilyOptions columnFamilyOptions;
-
+  private Table<String, OmBucketInfo> bucketInfoTable;
+  @Mock
+  private Table<String, OmKeyInfo> keyInfoTable;
+  @Mock
+  private OmBucketInfo omBucketInfo;
   @Mock
-  private RocksDB rocksDB;
+  private RDBStore dbStore;
+
+  private LoadingCache<String, OmSnapshot> snapshotCache;
 
   @Mock
   private RocksIterator jobTableIterator;
 
   private static CodecRegistry codecRegistry;
 
+  private final BiFunction<SnapshotInfo, SnapshotInfo, String>
+      generateSnapDiffJobKey =
+          (SnapshotInfo fromSnapshotInfo, SnapshotInfo toSnapshotInfo) ->
+              fromSnapshotInfo.getSnapshotId() + DELIMITER +
+                  toSnapshotInfo.getSnapshotId();
+
   @BeforeAll
   public static void initCodecRegistry() {
-    // Integers are used for indexing persistent list.
     codecRegistry = CodecRegistry.newBuilder()
-        .addCodec(SnapshotDiffReportOzone.DiffReportEntry.class,
-            SnapshotDiffReportOzone.getDiffReportEntryCodec())
-        .addCodec(SnapshotDiffJob.class, SnapshotDiffJob.getCodec()).build();
+        .addCodec(DiffReportEntry.class, getDiffReportEntryCodec())
+        .addCodec(SnapshotDiffJob.class, SnapshotDiffJob.getCodec())
+        .build();
   }
 
-  private DBStore getMockedDBStore(String dbStorePath) {
-    DBStore dbStore = mock(DBStore.class);
-    when(dbStore.getDbLocation()).thenReturn(new File(dbStorePath));
-    return dbStore;
-  }
+  @BeforeEach
+  public void init() throws RocksDBException, IOException, ExecutionException {
+    ExitUtils.disableSystemExit();
+    ExitUtil.disableSystemExit();
+
+    dbOptions = new ManagedDBOptions();
+    dbOptions.setCreateIfMissing(true);
+    columnFamilyOptions = new ManagedColumnFamilyOptions();
+
+    List<ColumnFamilyDescriptor> columnFamilyDescriptors =
+        Collections.singletonList(new ColumnFamilyDescriptor(
+            StringUtils.string2Bytes(DEFAULT_COLUMN_FAMILY_NAME),
+            columnFamilyOptions));
+
+    columnFamilyHandles = new ArrayList<>();
+
+    db = ManagedRocksDB.open(dbOptions, dbDir.getAbsolutePath(),
+        columnFamilyDescriptors, columnFamilyHandles);
+
+    snapDiffJobTable = db.get().createColumnFamily(
+        new ColumnFamilyDescriptor(
+            StringUtils.string2Bytes(SNAP_DIFF_JOB_TABLE_NAME),
+            columnFamilyOptions));
+    snapDiffReportTable = db.get().createColumnFamily(
+        new ColumnFamilyDescriptor(
+            StringUtils.string2Bytes(SNAP_DIFF_REPORT_TABLE_NAME),
+            columnFamilyOptions));
+
+    columnFamilyHandles.add(snapDiffJobTable);
+    columnFamilyHandles.add(snapDiffReportTable);
+
+    String snapshotNamePrefix = "snap-";
+    String snapshotPath = "snapshotPath";
+    String snapshotCheckpointDir = "snapshotCheckpointDir";
+    UUID baseSnapshotId = UUID.randomUUID();
+    String baseSnapshotName = snapshotNamePrefix + baseSnapshotId;
+    snapshotInfo = new SnapshotInfo.Builder()
+        .setSnapshotId(baseSnapshotId)
+        .setVolumeName(VOLUME_NAME)
+        .setBucketName(BUCKET_NAME)
+        .setName(baseSnapshotName)
+        .setSnapshotPath(snapshotPath)
+        .setCheckpointDir(snapshotCheckpointDir)
+        .build();
 
-  private OmSnapshot getMockedOmSnapshot(String snapshot) {
-    OmSnapshot omSnapshot = Mockito.mock(OmSnapshot.class);
-    OMMetadataManager metadataManager = ozoneManager.getMetadataManager();
-    DBStore dbStore = getMockedDBStore(snapshot);
-    Mockito.when(omSnapshot.getName()).thenReturn(snapshot);
-    Mockito.when(omSnapshot.getMetadataManager()).thenReturn(metadataManager);
-    Mockito.when(metadataManager.getStore()).thenReturn(dbStore);
-    return omSnapshot;
-  }
+    for (JobStatus jobStatus : jobStatuses) {
+      UUID targetSnapshotId = UUID.randomUUID();
+      String targetSnapshotName = snapshotNamePrefix +
+          targetSnapshotId;
+      SnapshotInfo targetSnapshot = new SnapshotInfo.Builder()
+          .setSnapshotId(targetSnapshotId)
+          .setVolumeName(VOLUME_NAME)
+          .setBucketName(BUCKET_NAME)
+          .setName(targetSnapshotName)
+          .setSnapshotPath(snapshotPath)
+          .setCheckpointDir(snapshotCheckpointDir)
+          .build();
+
+      SnapshotDiffJob diffJob = new SnapshotDiffJob(System.currentTimeMillis(),
+          UUID.randomUUID().toString(), jobStatus, VOLUME_NAME, BUCKET_NAME,
+          baseSnapshotName, targetSnapshotName, false, 0);
+
+      snapshotNames.add(targetSnapshotName);
+      snapshotInfoList.add(targetSnapshot);
+      snapDiffJobs.add(diffJob);
+    }
+
+    String bucketTableKey =
+        OM_KEY_PREFIX + VOLUME_NAME + OM_KEY_PREFIX + BUCKET_NAME;
+
+    when(configuration
+        .getTimeDuration(OZONE_OM_SNAPSHOT_DIFF_JOB_DEFAULT_WAIT_TIME,
+            OZONE_OM_SNAPSHOT_DIFF_JOB_DEFAULT_WAIT_TIME_DEFAULT,
+            TimeUnit.MILLISECONDS))
+        .thenReturn(OZONE_OM_SNAPSHOT_DIFF_JOB_DEFAULT_WAIT_TIME_DEFAULT);
+    when(configuration
+        .getBoolean(OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF,
+            OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF_DEFAULT))
+        .thenReturn(OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF_DEFAULT);
+    when(configuration
+        .getLong(OZONE_OM_SNAPSHOT_DIFF_MAX_ALLOWED_KEYS_CHANGED_PER_DIFF_JOB,
+            
OZONE_OM_SNAPSHOT_DIFF_MAX_ALLOWED_KEYS_CHANGED_PER_DIFF_JOB_DEFAULT
+        ))
+        .thenReturn(
+            
OZONE_OM_SNAPSHOT_DIFF_MAX_ALLOWED_KEYS_CHANGED_PER_DIFF_JOB_DEFAULT
+        );
+    when(configuration
+        .getInt(OZONE_OM_SNAPSHOT_DIFF_THREAD_POOL_SIZE,
+            OZONE_OM_SNAPSHOT_DIFF_THREAD_POOL_SIZE_DEFAULT))
+        .thenReturn(OZONE_OM_SNAPSHOT_DIFF_THREAD_POOL_SIZE_DEFAULT);
+    when(configuration
+        .getInt(OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE,
+            OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE_DEFAULT))
+        .thenReturn(OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE_DEFAULT);
+    when(configuration
+        .getStorageSize(OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE,
+            OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE_DEFAULT,
+            StorageUnit.BYTES))
+        .thenReturn(FileUtils.ONE_KB_BI.doubleValue());
+
+    for (int i = 0; i < jobStatuses.size(); i++) {
+      when(snapshotInfoTable.get(getTableKey(VOLUME_NAME, BUCKET_NAME,
+          snapshotNames.get(i)))).thenReturn(snapshotInfoList.get(i));
+    }
+
+    when(snapshotInfoTable.get(getTableKey(VOLUME_NAME, BUCKET_NAME,
+        baseSnapshotName))).thenReturn(snapshotInfo);
+
+    when(dbStore.getDbLocation()).thenReturn(dbDir);
+    when(dbStore.getSnapshotMetadataDir()).thenReturn(dbDir.getAbsolutePath());
+    when(omBucketInfo.getBucketLayout()).thenReturn(LEGACY);
+    when(bucketInfoTable.get(bucketTableKey)).thenReturn(omBucketInfo);
+    when(omMetadataManager.getStore()).thenReturn(dbStore);
+    when(omMetadataManager.getSnapshotInfoTable())
+        .thenReturn(snapshotInfoTable);
+    when(omMetadataManager.getBucketTable()).thenReturn(bucketInfoTable);
+    when(omMetadataManager.getBucketKey(VOLUME_NAME, BUCKET_NAME))
+        .thenReturn(bucketTableKey);
+    when(omMetadataManager.getKeyTable(LEGACY)).thenReturn(keyInfoTable);
+    when(ozoneManager.getConfiguration()).thenReturn(configuration);
+    when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
 
-  private SnapshotDiffManager getMockedSnapshotDiffManager(int cacheSize)
-      throws IOException {
-    when(snapdiffDB.get()).thenReturn(rocksDB);
-    when(rocksDB.newIterator(snapdiffJobCFH))
-        .thenReturn(jobTableIterator);
-    when(rocksDB.newIterator(Mockito.eq(snapdiffJobCFH),
-            Mockito.any(ReadOptions.class)))
-        .thenReturn(jobTableIterator);
     CacheLoader<String, OmSnapshot> loader =
         new CacheLoader<String, OmSnapshot>() {
+          @NotNull
           @Override
-          public OmSnapshot load(String key) {
+          public OmSnapshot load(@NotNull String key) {
             return getMockedOmSnapshot(key);
           }
         };
-    snapshotCache = CacheBuilder.newBuilder()
-        .maximumSize(cacheSize)
-        .build(loader);
-    Mockito.when(ozoneManager.getConfiguration())
-        .thenReturn(new OzoneConfiguration());
-    OMMetadataManager mockedMetadataManager =
-        Mockito.mock(OMMetadataManager.class);
-    RDBStore mockedRDBStore = Mockito.mock(RDBStore.class);
-    Path diffDir = Files.createTempDirectory("snapdiff_dir");
-    Mockito.when(mockedRDBStore.getSnapshotMetadataDir())
-        .thenReturn(diffDir.toString());
-    Mockito.when(mockedMetadataManager.getStore()).thenReturn(mockedRDBStore);
-    Mockito.when(ozoneManager.getMetadataManager())
-        .thenReturn(mockedMetadataManager);
-    SnapshotDiffManager snapshotDiffManager = Mockito.spy(
-        new SnapshotDiffManager(snapdiffDB, differ, ozoneManager, 
snapshotCache,
-            snapdiffJobCFH, snapdiffReportCFH, columnFamilyOptions,
-            codecRegistry));
-    PersistentMap<String, SnapshotDiffJob> snapDiffJobTable =
-        new SnapshotTestUtils.StubbedPersistentMap<>();
-    HddsWhiteboxTestUtils.setInternalState(snapshotDiffManager,
-        "snapDiffJobTable", snapDiffJobTable);
-    return snapshotDiffManager;
+
+    snapshotCache = CacheBuilder.newBuilder().maximumSize(10).build(loader);
+
+    snapshotDiffManager = new SnapshotDiffManager(db, differ, ozoneManager,
+        snapshotCache, snapDiffJobTable, snapDiffReportTable,
+        columnFamilyOptions, codecRegistry);
+  }
+
+  @AfterEach
+  public void tearDown() {
+    if (columnFamilyHandles != null) {
+      columnFamilyHandles.forEach(IOUtils::closeQuietly);
+    }
+
+    IOUtils.closeQuietly(db);
+    IOUtils.closeQuietly(dbOptions);
+    IOUtils.closeQuietly(columnFamilyOptions);
+    IOUtils.closeQuietly(snapshotDiffManager);
+  }
+
+  private OmSnapshot getMockedOmSnapshot(String snapshot) {
+    OmSnapshot omSnapshot = mock(OmSnapshot.class);
+    when(omSnapshot.getName()).thenReturn(snapshot);
+    when(omSnapshot.getMetadataManager()).thenReturn(omMetadataManager);
+    when(omMetadataManager.getStore()).thenReturn(dbStore);
+    return omSnapshot;
   }
 
   private SnapshotInfo getMockedSnapshotInfo(UUID snapshotId) {
-    SnapshotInfo snapshotInfo = mock(SnapshotInfo.class);
-    Mockito.when(snapshotInfo.getSnapshotId()).thenReturn(snapshotId);
-    return snapshotInfo;
+    SnapshotInfo snapInfo = mock(SnapshotInfo.class);
+    when(snapInfo.getSnapshotId()).thenReturn(snapshotId);
+    return snapInfo;
   }
 
   @ParameterizedTest
   @ValueSource(ints = {1, 2, 5, 10, 100, 1000, 10000})
   public void testGetDeltaFilesWithDag(int numberOfFiles)
       throws ExecutionException, RocksDBException, IOException {
-
-    SnapshotDiffManager snapshotDiffManager = getMockedSnapshotDiffManager(10);
     UUID snap1 = UUID.randomUUID();
     UUID snap2 = UUID.randomUUID();
 
@@ -231,18 +406,23 @@ public class TestSnapshotDiffManager {
     Set<String> randomStrings = IntStream.range(0, numberOfFiles)
         .mapToObj(i -> RandomStringUtils.randomAlphabetic(10))
         .collect(Collectors.toSet());
-    Mockito.when(differ.getSSTDiffListWithFullPath(Mockito.any(),
-        Mockito.any(), Mockito.eq(diffDir)))
-        .thenReturn(Lists.newArrayList(randomStrings));
+
+    when(differ.getSSTDiffListWithFullPath(
+        any(DifferSnapshotInfo.class),
+        any(DifferSnapshotInfo.class),
+        eq(diffDir))
+    ).thenReturn(Lists.newArrayList(randomStrings));
+
     SnapshotInfo fromSnapshotInfo = getMockedSnapshotInfo(snap1);
-    SnapshotInfo toSnapshotInfo = getMockedSnapshotInfo(snap1);
-    Mockito.when(jobTableIterator.isValid()).thenReturn(false);
+    SnapshotInfo toSnapshotInfo = getMockedSnapshotInfo(snap2);
+    when(jobTableIterator.isValid()).thenReturn(false);
     Set<String> deltaFiles = snapshotDiffManager.getDeltaFiles(
         snapshotCache.get(snap1.toString()),
         snapshotCache.get(snap2.toString()),
-        Arrays.asList("cf1", "cf2"), fromSnapshotInfo, toSnapshotInfo, false,
-        Collections.EMPTY_MAP, diffDir);
-    Assertions.assertEquals(randomStrings, deltaFiles);
+        Arrays.asList("cf1", "cf2"), fromSnapshotInfo,
+        toSnapshotInfo, false,
+        Collections.emptyMap(), diffDir);
+    assertEquals(randomStrings, deltaFiles);
   }
 
   @ParameterizedTest
@@ -257,9 +437,9 @@ public class TestSnapshotDiffManager {
          MockedStatic<RocksDiffUtils> mockedRocksDiffUtils =
              Mockito.mockStatic(RocksDiffUtils.class)) {
       Set<String> deltaStrings = new HashSet<>();
+
       mockedRdbUtil.when(
-              () -> RdbUtil.getSSTFilesForComparison(Matchers.anyString(),
-                  Matchers.anyList()))
+              () -> RdbUtil.getSSTFilesForComparison(anyString(), anyList()))
           .thenAnswer((Answer<Set<String>>) invocation -> {
             Set<String> retVal = IntStream.range(0, numberOfFiles)
                 .mapToObj(i -> RandomStringUtils.randomAlphabetic(10))
@@ -267,39 +447,44 @@ public class TestSnapshotDiffManager {
             deltaStrings.addAll(retVal);
             return retVal;
           });
-      mockedRocksDiffUtils.when(() -> RocksDiffUtils.filterRelevantSstFiles(
-              Matchers.anySet(), Matchers.anyMap()))
+
+      mockedRocksDiffUtils.when(() ->
+              RocksDiffUtils.filterRelevantSstFiles(anySet(), anyMap()))
           .thenAnswer((Answer<Void>) invocationOnMock -> {
             invocationOnMock.getArgument(0, Set.class).stream()
                 .findAny().ifPresent(val -> {
-                  Assertions.assertTrue(deltaStrings.contains(val));
+                  assertTrue(deltaStrings.contains(val));
                   invocationOnMock.getArgument(0, Set.class).remove(val);
                   deltaStrings.remove(val);
                 });
             return null;
           });
-      SnapshotDiffManager snapshotDiffManager =
-          getMockedSnapshotDiffManager(10);
+      SnapshotDiffManager spy = spy(snapshotDiffManager);
       UUID snap1 = UUID.randomUUID();
       UUID snap2 = UUID.randomUUID();
       if (!useFullDiff) {
         Set<String> randomStrings = Collections.emptySet();
-        Mockito.when(differ.getSSTDiffListWithFullPath(
-            Mockito.any(DifferSnapshotInfo.class),
-            Mockito.any(DifferSnapshotInfo.class),
-            Matchers.anyString()))
+        when(differ.getSSTDiffListWithFullPath(
+            any(DifferSnapshotInfo.class),
+            any(DifferSnapshotInfo.class),
+            anyString()))
             .thenReturn(Lists.newArrayList(randomStrings));
       }
+
       SnapshotInfo fromSnapshotInfo = getMockedSnapshotInfo(snap1);
       SnapshotInfo toSnapshotInfo = getMockedSnapshotInfo(snap1);
-      Mockito.when(jobTableIterator.isValid()).thenReturn(false);
-      Set<String> deltaFiles = snapshotDiffManager.getDeltaFiles(
+      when(jobTableIterator.isValid()).thenReturn(false);
+      Set<String> deltaFiles = spy.getDeltaFiles(
           snapshotCache.get(snap1.toString()),
           snapshotCache.get(snap2.toString()),
-          Arrays.asList("cf1", "cf2"), fromSnapshotInfo, toSnapshotInfo, false,
-          Collections.EMPTY_MAP, Files.createTempDirectory("snapdiff_dir")
-              .toAbsolutePath().toString());
-      Assertions.assertEquals(deltaStrings, deltaFiles);
+          Arrays.asList("cf1", "cf2"),
+          fromSnapshotInfo,
+          toSnapshotInfo,
+          false,
+          Collections.emptyMap(),
+          Files.createTempDirectory("snapdiff_dir").toAbsolutePath()
+              .toString());
+      assertEquals(deltaStrings, deltaFiles);
     }
   }
 
@@ -307,9 +492,9 @@ public class TestSnapshotDiffManager {
       Map<String, WithParentObjectId> map, String tableName)
       throws IOException {
     Table<String, ? extends WithParentObjectId> mocked = mock(Table.class);
-    Mockito.when(mocked.get(Matchers.any()))
+    when(mocked.get(any()))
         .thenAnswer(invocation -> map.get(invocation.getArgument(0)));
-    Mockito.when(mocked.getName()).thenReturn(tableName);
+    when(mocked.getName()).thenReturn(tableName);
     return mocked;
   }
 
@@ -331,17 +516,12 @@ public class TestSnapshotDiffManager {
   /**
    * Test mocks the SSTFileReader to return object Ids from 0-50
    * when not reading tombstones & Object Ids 0-100 when reading tombstones.
-   * Creating a mock snapshot table where the from Snapshot Table contains
-   * Object Ids in the range 0-25 & 50-100 and to Snaphshot Table contains data
+   * Creating a mock snapshot table where the fromSnapshot Table contains
+   * Object Ids in the range 0-25 & 50-100 and toSnapshot Table contains data
    * with object Ids in the range 0-50.
    * Function should return 25-50 in the new Persistent map.
    * In the case of reading tombstones old Snapshot Persistent map should have
    * object Ids in the range 50-100 & should be empty otherwise
-   *
-   * @param nativeLibraryLoaded
-   * @param snapshotTableName
-   * @throws NativeLibraryNotLoadedException
-   * @throws IOException
    */
   @SuppressFBWarnings({"DLS_DEAD_LOCAL_STORE",
       "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"})
@@ -355,368 +535,303 @@ public class TestSnapshotDiffManager {
   public void testObjectIdMapWithTombstoneEntries(boolean nativeLibraryLoaded,
                                                   String snapshotTableName)
       throws NativeLibraryNotLoadedException, IOException, RocksDBException {
-    // Mocking SST file with keys in SST file including tombstones
-    Set<String> keysWithTombstones = IntStream.range(0, 100)
+    Set<String> keysIncludingTombstones = IntStream.range(0, 100)
         .boxed().map(i -> (i + 100) + "/key" + i).collect(Collectors.toSet());
     // Mocking SST file with keys in SST file excluding tombstones
-    Set<String> keys = IntStream.range(0, 50).boxed()
+    Set<String> keysExcludingTombstones = IntStream.range(0, 50).boxed()
         .map(i -> (i + 100) + "/key" + i).collect(Collectors.toSet());
+
     // Mocking SSTFileReader functions to return the above keys list.
     try (MockedConstruction<ManagedSstFileReader> mockedSSTFileReader =
              Mockito.mockConstruction(ManagedSstFileReader.class,
                  (mock, context) -> {
-                   Mockito.when(mock.getKeyStreamWithTombstone(Matchers.any()))
-                       .thenReturn(keysWithTombstones.stream());
-                   Mockito.when(mock.getKeyStream())
-                       .thenReturn(keys.stream());
+                   when(mock.getKeyStreamWithTombstone(any()))
+                       .thenReturn(keysIncludingTombstones.stream());
+                   when(mock.getKeyStream())
+                       .thenReturn(keysExcludingTombstones.stream());
                  });
          MockedConstruction<ManagedSSTDumpTool> mockedSSTDumpTool =
              Mockito.mockConstruction(ManagedSSTDumpTool.class,
                  (mock, context) -> {
                  })
     ) {
-      //
       Map<String, WithParentObjectId> toSnapshotTableMap =
           IntStream.concat(IntStream.range(0, 25), IntStream.range(50, 100))
               .boxed().collect(Collectors.toMap(i -> (i + 100) + "/key" + i,
                   i -> getKeyInfo(i, i, i + 100,
                       snapshotTableName)));
-      // Mocking To snapshot table containing list of keys b/w 0-25, 50-100
       Table<String, ? extends WithParentObjectId> toSnapshotTable =
           getMockedTable(toSnapshotTableMap, snapshotTableName);
-      // Mocking To snapshot table containing list of keys b/w 0-50
+
       Map<String, WithParentObjectId> fromSnapshotTableMap =
           IntStream.range(0, 50)
               .boxed().collect(Collectors.toMap(i -> (i + 100) + "/key" + i,
                   i -> getKeyInfo(i, i, i + 100, snapshotTableName)));
-      // Expected Diff 25-50 are newly created keys & keys b/w are deleted,
-      // when reding keys with tombstones the keys would be added to
-      // objectIdsToBeChecked otherwise it wouldn't be added
+
       Table<String, ? extends WithParentObjectId> fromSnapshotTable =
           getMockedTable(fromSnapshotTableMap, snapshotTableName);
-      SnapshotDiffManager snapshotDiffManager =
-          getMockedSnapshotDiffManager(10);
-      // Mocking to filter even keys in bucket.
-      // Odd keys should be filtered out in the diff.
-      Mockito.doAnswer((Answer<Boolean>) invocationOnMock ->
-          Integer.parseInt(invocationOnMock.getArgument(0, String.class)
-              .substring(7)) % 2 == 0).when(snapshotDiffManager)
-          .isKeyInBucket(Matchers.anyString(), Matchers.anyMap(),
-              Matchers.anyString());
+
+      snapshotDiffManager = new SnapshotDiffManager(db, differ, ozoneManager,
+          snapshotCache, snapDiffJobTable, snapDiffReportTable,
+          columnFamilyOptions, codecRegistry);
+      SnapshotDiffManager spy = spy(snapshotDiffManager);
+
+      doAnswer(invocation -> {
+            String[] split = invocation.getArgument(0, 
String.class).split("/");
+            String keyName = split[split.length - 1];
+            return Integer.parseInt(keyName.substring(3)) % 2 == 0;
+          }
+      ).when(spy).isKeyInBucket(anyString(), anyMap(), anyString());
+
       PersistentMap<byte[], byte[]> oldObjectIdKeyMap =
-          new SnapshotTestUtils.StubbedPersistentMap<>();
+          new StubbedPersistentMap<>();
       PersistentMap<byte[], byte[]> newObjectIdKeyMap =
           new SnapshotTestUtils.StubbedPersistentMap<>();
       PersistentMap<byte[], SnapshotDiffObject> objectIdsToCheck =
           new SnapshotTestUtils.StubbedPersistentMap<>();
+
       Set<Long> oldParentIds = Sets.newHashSet();
       Set<Long> newParentIds = Sets.newHashSet();
-      snapshotDiffManager.addToObjectIdMap(toSnapshotTable,
+
+      spy.addToObjectIdMap(toSnapshotTable,
           fromSnapshotTable, Sets.newHashSet("dummy.sst"),
           nativeLibraryLoaded, oldObjectIdKeyMap, newObjectIdKeyMap,
-          objectIdsToCheck, Optional.ofNullable(oldParentIds),
-          Optional.ofNullable(newParentIds),
+          objectIdsToCheck, Optional.of(oldParentIds),
+          Optional.of(newParentIds),
           ImmutableMap.of(OmMetadataManagerImpl.DIRECTORY_TABLE, "",
               OmMetadataManagerImpl.KEY_TABLE, "",
               OmMetadataManagerImpl.FILE_TABLE, ""));
 
-      Iterator<Entry<byte[], byte[]>> oldObjectIdIter =
-          oldObjectIdKeyMap.iterator();
-      int oldObjectIdCnt = 0;
-      while (oldObjectIdIter.hasNext()) {
-        Entry<byte[], byte[]> v = oldObjectIdIter.next();
-        long objectId = this.codecRegistry.asObject(v.getKey(), Long.class);
-        Assertions.assertTrue(objectId % 2 == 0);
-        Assertions.assertTrue(objectId >= 50);
-        Assertions.assertTrue(objectId < 100);
-        oldObjectIdCnt += 1;
+      try (ClosableIterator<Map.Entry<byte[], byte[]>> oldObjectIdIter =
+               oldObjectIdKeyMap.iterator()) {
+        int oldObjectIdCnt = 0;
+        while (oldObjectIdIter.hasNext()) {
+          Map.Entry<byte[], byte[]> v = oldObjectIdIter.next();
+          long objectId = codecRegistry.asObject(v.getKey(), Long.class);
+          assertEquals(0, objectId % 2);
+          assertTrue(objectId >= 50);
+          assertTrue(objectId < 100);
+          oldObjectIdCnt += 1;
+        }
+        assertEquals(nativeLibraryLoaded ? 25 : 0, oldObjectIdCnt);
       }
-      Assertions.assertEquals(nativeLibraryLoaded ? 25 : 0, oldObjectIdCnt);
-      Iterator<Entry<byte[], byte[]>> newObjectIdIter =
-          newObjectIdKeyMap.iterator();
-      int newObjectIdCnt = 0;
-      while (newObjectIdIter.hasNext()) {
-        Entry<byte[], byte[]> v = newObjectIdIter.next();
-        long objectId = this.codecRegistry.asObject(v.getKey(), Long.class);
-        Assertions.assertTrue(objectId % 2 == 0);
-        Assertions.assertTrue(objectId >= 26);
-        Assertions.assertTrue(objectId < 50);
-        newObjectIdCnt += 1;
+
+      try (ClosableIterator<Map.Entry<byte[], byte[]>> newObjectIdIter =
+               newObjectIdKeyMap.iterator()) {
+        int newObjectIdCnt = 0;
+        while (newObjectIdIter.hasNext()) {
+          Map.Entry<byte[], byte[]> v = newObjectIdIter.next();
+          long objectId = codecRegistry.asObject(v.getKey(), Long.class);
+          assertEquals(0, objectId % 2);
+          assertTrue(objectId >= 26);
+          assertTrue(objectId < 50);
+          newObjectIdCnt += 1;
+        }
+        assertEquals(12, newObjectIdCnt);
       }
-      Assertions.assertEquals(12, newObjectIdCnt);
-
-      ClosableIterator<Entry<byte[], SnapshotDiffObject>> objectIdsToCheckIter 
=
-          objectIdsToCheck.iterator();
-      int objectIdCnt = 0;
-      while (objectIdsToCheckIter.hasNext()) {
-        Entry<byte[], SnapshotDiffObject> entry = objectIdsToCheckIter.next();
-        byte[] v = entry.getKey();
-        long objectId = this.codecRegistry.asObject(v, Long.class);
-        Assertions.assertTrue(objectId % 2 == 0);
-        Assertions.assertTrue(objectId >= 26);
-        Assertions.assertTrue(objectId < (nativeLibraryLoaded ? 100 : 50));
-        objectIdCnt += 1;
+
+      try (ClosableIterator<Entry<byte[], SnapshotDiffObject>>
+               objectIdsToCheckIter = objectIdsToCheck.iterator()) {
+        int objectIdCnt = 0;
+        while (objectIdsToCheckIter.hasNext()) {
+          Entry<byte[], SnapshotDiffObject> entry = 
objectIdsToCheckIter.next();
+          byte[] v = entry.getKey();
+          long objectId = codecRegistry.asObject(v, Long.class);
+          assertEquals(0, objectId % 2);
+          assertTrue(objectId >= 26);
+          assertTrue(objectId < (nativeLibraryLoaded ? 100 : 50));
+          objectIdCnt += 1;
+        }
+        assertEquals(nativeLibraryLoaded ? 37 : 12, objectIdCnt);
       }
-      Assertions.assertEquals(nativeLibraryLoaded ? 37 : 12, objectIdCnt);
     }
   }
 
-  /**
-    Testing generateDiffReport function by providing PersistentMap containing
-    objectId Map of diff keys to be checked with their corresponding key names.
-   */
   @Test
   public void testGenerateDiffReport() throws IOException {
-    // Mocking RocksDbPersistentMap constructor to use stubbed
-    // implementation instead.
-    try (MockedConstruction<RocksDbPersistentMap> mockedRocksDbPersistentMap =
-            Mockito.mockConstruction(RocksDbPersistentMap.class,
-                (mock, context) -> {
-                  PersistentMap obj =
-                      new SnapshotTestUtils.StubbedPersistentMap<>();
-                  when(mock.iterator()).thenReturn(obj.iterator());
-                  when(mock.iterator(Mockito.any(Optional.class),
-                      Mockito.any(Optional.class)))
-                      .thenAnswer(i -> obj.iterator(i.getArgument(0),
-                          i.getArgument(1)));
-                  when(mock.get(Matchers.any()))
-                      .thenAnswer(i -> obj.get(i.getArgument(0)));
-                  Mockito.doAnswer((Answer<Void>) i -> {
-                    obj.put(i.getArgument(0), i.getArgument(1));
-                    return null;
-                  }).when(mock).put(Matchers.any(), Matchers.any());
-                });
-        MockedConstruction<RocksDbPersistentList> mockedPersistentList =
-            Mockito.mockConstruction(
-                RocksDbPersistentList.class, (mock, context) -> {
-                  PersistentList obj =
-                      new SnapshotTestUtils.ArrayPersistentList<>();
-                  Mockito.when(mock.add(Matchers.any()))
-                      .thenAnswer(i -> obj.add(i.getArgument(0)));
-                  Mockito.when(mock.get(Matchers.anyInt()))
-                      .thenAnswer(i -> obj.get(i.getArgument(0)));
-                  Mockito.when(mock.addAll(Matchers.any(PersistentList.class)))
-                      .thenAnswer(i -> obj.addAll(i.getArgument(0)));
-                  Mockito.when(mock.iterator())
-                      .thenAnswer(i -> obj.iterator());
-                })) {
-      PersistentMap<byte[], byte[]> oldObjectIdKeyMap =
-          new SnapshotTestUtils.StubbedPersistentMap<>();
-      PersistentMap<byte[], byte[]> newObjectIdKeyMap =
-          new SnapshotTestUtils.StubbedPersistentMap<>();
-      PersistentMap<byte[], SnapshotDiffObject> objectIdToDiffObject =
-          new SnapshotTestUtils.StubbedPersistentMap<>();
-      Map<Long, SnapshotDiffReport.DiffType> diffMap = new HashMap<>();
-      LongStream.range(0, 100).forEach(objectId -> {
-        try {
-          SnapshotDiffObjectBuilder builder =
-              new SnapshotDiffObjectBuilder(objectId);
-          String key = "key" + objectId;
-          byte[] objectIdVal = codecRegistry.asRawData(objectId);
-          byte[] keyBytes = codecRegistry.asRawData(key);
-          if (objectId >= 0 && objectId <= 25 ||
-              objectId >= 50 && objectId <= 100) {
-            oldObjectIdKeyMap.put(objectIdVal, keyBytes);
-            builder.withOldKeyName(key);
-          }
-          if (objectId >= 0 && objectId <= 25 && objectId % 4 == 0 ||
-              objectId > 25 && objectId < 50) {
-            newObjectIdKeyMap.put(objectIdVal, keyBytes);
-            builder.withNewKeyName(key);
-          }
-          if (objectId >= 0 && objectId <= 25 && objectId % 4 == 1) {
-            String renamedKey = "renamed-key" + objectId;
-            byte[] renamedKeyBytes = codecRegistry.asRawData(renamedKey);
-            newObjectIdKeyMap.put(objectIdVal, renamedKeyBytes);
-            diffMap.put(objectId, SnapshotDiffReport.DiffType.RENAME);
-            builder.withOldKeyName(key);
-            builder.withNewKeyName(renamedKey);
-          }
-          objectIdToDiffObject.put(objectIdVal, builder.build());
-          if (objectId >= 50 && objectId <= 100 ||
-              objectId >= 0 && objectId <= 25 && objectId % 4 > 1) {
-            diffMap.put(objectId, SnapshotDiffReport.DiffType.DELETE);
-          }
-          if (objectId >= 0 && objectId <= 25 && objectId % 4 == 0) {
-            diffMap.put(objectId, SnapshotDiffReport.DiffType.MODIFY);
-          }
-          if (objectId > 25 && objectId < 50) {
-            diffMap.put(objectId, SnapshotDiffReport.DiffType.CREATE);
-          }
-        } catch (IOException e) {
-          throw new RuntimeException(e);
+    PersistentMap<byte[], byte[]> oldObjectIdKeyMap =
+        new StubbedPersistentMap<>();
+    PersistentMap<byte[], byte[]> newObjectIdKeyMap =
+        new StubbedPersistentMap<>();
+    PersistentMap<byte[], SnapshotDiffObject> objectIdToDiffObject =
+        new SnapshotTestUtils.StubbedPersistentMap<>();
+    Map<Long, SnapshotDiffReport.DiffType> diffMap = new HashMap<>();
+    LongStream.range(0, 100).forEach(objectId -> {
+      try {
+        SnapshotDiffObjectBuilder builder =
+            new SnapshotDiffObjectBuilder(objectId);
+        String key = "key" + objectId;
+        byte[] objectIdVal = codecRegistry.asRawData(objectId);
+        byte[] keyBytes = codecRegistry.asRawData(key);
+        if (objectId >= 0 && objectId <= 25 ||
+            objectId >= 50 && objectId <= 100) {
+          oldObjectIdKeyMap.put(objectIdVal, keyBytes);
+          builder.withOldKeyName(key);
+        }
+        if (objectId >= 0 && objectId <= 25 && objectId % 4 == 0 ||
+            objectId > 25 && objectId < 50) {
+          newObjectIdKeyMap.put(objectIdVal, keyBytes);
+          builder.withNewKeyName(key);
         }
-      });
-      String volumeName = "vol";
-      String bucketName = "buck";
-      String fromSnapName = "fs";
-      String toSnapName = "ts";
-      UUID fromSnapId = UUID.randomUUID();
-      UUID toSnapId = UUID.randomUUID();
-
-      OmKeyInfo fromKeyInfo = mock(OmKeyInfo.class);
-      OmKeyInfo toKeyInfo = mock(OmKeyInfo.class);
-      // This is temporary to make sure that
-      // SnapshotDeletingService#isBlockLocationInfoSame always return true.
-      when(toKeyInfo.isHsync()).thenReturn(true);
-      when(fromKeyInfo.isHsync()).thenReturn(true);
-
-      Table<String, OmKeyInfo> fromSnapTable = mock(Table.class);
-      Table<String, OmKeyInfo> toSnapTable = mock(Table.class);
-      when(fromSnapTable.get(anyString())).thenReturn(fromKeyInfo);
-      when(toSnapTable.get(anyString())).thenReturn(toKeyInfo);
-
-      SnapshotDiffManager snapshotDiffManager =
-          getMockedSnapshotDiffManager(10);
-
-      setupMocksForRunningASnapDiff(volumeName, bucketName);
-      setUpSnapshots(volumeName, bucketName,
-          fromSnapName, toSnapName, fromSnapId, toSnapId);
-      String jobKey = fromSnapId + DELIMITER + toSnapId;
-
-      SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0, "jobId",
-          JobStatus.IN_PROGRESS, volumeName,
-          bucketName, fromSnapName, toSnapName,
-          true, diffMap.size());
-
-      snapshotDiffManager.getSnapDiffJobTable().put(jobKey, snapshotDiffJob);
-
-      snapshotDiffManager.generateDiffReport("jobId", fromSnapTable,
-          toSnapTable, objectIdToDiffObject, oldObjectIdKeyMap,
-          newObjectIdKeyMap, volumeName, bucketName, fromSnapName, toSnapName,
-          false, null, null);
-
-      snapshotDiffJob.setStatus(JobStatus.DONE);
-      snapshotDiffManager.getSnapDiffJobTable().put(jobKey, snapshotDiffJob);
-
-      SnapshotDiffReportOzone snapshotDiffReportOzone =
-          snapshotDiffManager.createPageResponse(snapshotDiffJob, volumeName,
-              bucketName, fromSnapName, toSnapName,
-              0, Integer.MAX_VALUE);
-      Set<SnapshotDiffReport.DiffType> expectedOrder = new LinkedHashSet<>();
-      expectedOrder.add(SnapshotDiffReport.DiffType.DELETE);
-      expectedOrder.add(SnapshotDiffReport.DiffType.RENAME);
-      expectedOrder.add(SnapshotDiffReport.DiffType.CREATE);
-      expectedOrder.add(SnapshotDiffReport.DiffType.MODIFY);
-
-      Set<SnapshotDiffReport.DiffType> actualOrder = new LinkedHashSet<>();
-      for (SnapshotDiffReport.DiffReportEntry entry :
-          snapshotDiffReportOzone.getDiffList()) {
-        actualOrder.add(entry.getType());
-
-        long objectId = Long.parseLong(
-            DFSUtilClient.bytes2String(entry.getSourcePath()).substring(4));
-        Assertions.assertEquals(diffMap.get(objectId), entry.getType());
+        if (objectId >= 0 && objectId <= 25 && objectId % 4 == 1) {
+          String renamedKey = "renamed-key" + objectId;
+          byte[] renamedKeyBytes = codecRegistry.asRawData(renamedKey);
+          newObjectIdKeyMap.put(objectIdVal, renamedKeyBytes);
+          diffMap.put(objectId, SnapshotDiffReport.DiffType.RENAME);
+          builder.withOldKeyName(key);
+          builder.withNewKeyName(renamedKey);
+        }
+        objectIdToDiffObject.put(objectIdVal, builder.build());
+        if (objectId >= 50 && objectId <= 100 ||
+            objectId >= 0 && objectId <= 25 && objectId % 4 > 1) {
+          diffMap.put(objectId, SnapshotDiffReport.DiffType.DELETE);
+        }
+        if (objectId >= 0 && objectId <= 25 && objectId % 4 == 0) {
+          diffMap.put(objectId, SnapshotDiffReport.DiffType.MODIFY);
+        }
+        if (objectId > 25 && objectId < 50) {
+          diffMap.put(objectId, SnapshotDiffReport.DiffType.CREATE);
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
       }
-      Assertions.assertEquals(expectedOrder, actualOrder);
+    });
+
+    String volumeName = "vol";
+    String bucketName = "buck";
+    String fromSnapName = "fs";
+    String toSnapName = "ts";
+
+    OmKeyInfo fromKeyInfo = mock(OmKeyInfo.class);
+    OmKeyInfo toKeyInfo = mock(OmKeyInfo.class);
+    // This is temporary to make sure that
+    // SnapshotDeletingService#isBlockLocationInfoSame always return true.
+    when(toKeyInfo.isHsync()).thenReturn(true);
+    when(fromKeyInfo.isHsync()).thenReturn(true);
+
+    Table<String, OmKeyInfo> fromSnapTable = mock(Table.class);
+    Table<String, OmKeyInfo> toSnapTable = mock(Table.class);
+    when(fromSnapTable.get(anyString())).thenReturn(fromKeyInfo);
+    when(toSnapTable.get(anyString())).thenReturn(toKeyInfo);
+
+
+    SnapshotDiffManager spy = spy(snapshotDiffManager);
+    doReturn(true).when(spy)
+        .areDiffJobAndSnapshotsActive(volumeName, bucketName, fromSnapName,
+            toSnapName);
+
+    long totalDiffEntries = spy.generateDiffReport("jobId",
+        fromSnapTable, toSnapTable, objectIdToDiffObject, oldObjectIdKeyMap,
+        newObjectIdKeyMap, volumeName, bucketName, fromSnapName, toSnapName,
+        false, Optional.empty(), Optional.empty());
+
+    assertEquals(100, totalDiffEntries);
+    SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0, "jobId",
+        JobStatus.DONE, "vol", "buck", "fs", "ts",
+        true, diffMap.size());
+    SnapshotDiffReportOzone snapshotDiffReportOzone =
+        snapshotDiffManager.createPageResponse(snapshotDiffJob, "vol",
+            "buck", "fs", "ts",
+            0, Integer.MAX_VALUE);
+    Set<SnapshotDiffReport.DiffType> expectedOrder = new LinkedHashSet<>();
+    expectedOrder.add(SnapshotDiffReport.DiffType.DELETE);
+    expectedOrder.add(SnapshotDiffReport.DiffType.RENAME);
+    expectedOrder.add(SnapshotDiffReport.DiffType.CREATE);
+    expectedOrder.add(SnapshotDiffReport.DiffType.MODIFY);
+
+    Set<SnapshotDiffReport.DiffType> actualOrder = new LinkedHashSet<>();
+    for (DiffReportEntry entry :
+        snapshotDiffReportOzone.getDiffList()) {
+      actualOrder.add(entry.getType());
+
+      long objectId = Long.parseLong(
+          DFSUtilClient.bytes2String(entry.getSourcePath()).substring(4));
+      assertEquals(diffMap.get(objectId), entry.getType());
     }
+    assertEquals(expectedOrder, actualOrder);
   }
 
-  private SnapshotDiffReport.DiffReportEntry getTestDiffEntry(String jobId,
-          int idx) throws IOException {
-    return new SnapshotDiffReport.DiffReportEntry(
+  private DiffReportEntry getTestDiffEntry(String jobId,
+                                           int idx) throws IOException {
+    return new DiffReportEntry(
         SnapshotDiffReport.DiffType.values()[idx %
             SnapshotDiffReport.DiffType.values().length],
         codecRegistry.asRawData(jobId + DELIMITER + idx));
   }
 
   /**
-   Testing generateDiffReport function by providing PersistentMap containing
-   objectId Map of diff keys to be checked with their corresponding key names.
+   * Testing generateDiffReport function by providing PersistentMap containing
+   * objectId Map of diff keys to be checked with their corresponding key 
names.
    */
   @ParameterizedTest
   @CsvSource({"0,10,1000", "1,10,8", "1000,1000,10", "-1,1000,10000",
       "1,0,1000", "1,-1,1000"})
-  public void testCreatePageResponse(int startIdx, int pageSize,
-        int totalNumberOfRecords) throws IOException {
-    // Mocking RocksDbPersistentMap constructor to use stubbed
-    // implementation instead.
-    Map<ColumnFamilyHandle, RocksDbPersistentMap>
-        cfHandleRocksDbPersistentMap = new HashMap<>();
-    try (MockedConstruction<RocksDbPersistentMap> mockedRocksDbPersistentMap =
-            Mockito.mockConstruction(RocksDbPersistentMap.class,
-                (mock, context) -> {
-                  ColumnFamilyHandle cf =
-                      (ColumnFamilyHandle) context.arguments().stream()
-                          .filter(arg -> arg instanceof ColumnFamilyHandle)
-                          .findFirst().get();
-                  cfHandleRocksDbPersistentMap.put(cf, mock);
-                  PersistentMap obj =
-                      new SnapshotTestUtils.StubbedPersistentMap<>();
-                  when(mock.iterator()).thenReturn(obj.iterator());
-                  when(mock.iterator(any(Optional.class),
-                      any(Optional.class))).thenAnswer(i ->
-                      obj.iterator(i.getArgument(0), i.getArgument(1)));
-                  Mockito.when(mock.get(Matchers.any()))
-                      .thenAnswer(i -> obj.get(i.getArgument(0)));
-                  Mockito.doAnswer((Answer<Void>) i -> {
-                    obj.put(i.getArgument(0), i.getArgument(1));
-                    return null;
-                  }).when(mock).put(Matchers.any(), Matchers.any());
-              })) {
-      String testJobId = "jobId";
-      String testJobId2 = "jobId2";
-      SnapshotDiffManager snapshotDiffManager =
-          getMockedSnapshotDiffManager(10);
-      IntStream.range(0, totalNumberOfRecords).boxed().forEach(idx -> {
-        try {
-          cfHandleRocksDbPersistentMap.get(snapdiffReportCFH)
-              .put(codecRegistry.asRawData(SnapshotDiffManager
-                      .getReportKeyForIndex(testJobId, idx)),
-                  codecRegistry.asRawData(getTestDiffEntry(testJobId, idx)));
-          cfHandleRocksDbPersistentMap.get(snapdiffReportCFH)
-              .put(codecRegistry.asRawData(SnapshotDiffManager
-                      .getReportKeyForIndex(testJobId2, idx)),
-                  codecRegistry.asRawData(getTestDiffEntry(testJobId2, idx)));
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      });
-      SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0, testJobId,
-          SnapshotDiffResponse.JobStatus.DONE, "vol", "buck", "fs", "ts",
-          true, totalNumberOfRecords);
-      SnapshotDiffJob snapshotDiffJob2 = new SnapshotDiffJob(0, testJobId2,
-          SnapshotDiffResponse.JobStatus.DONE, "vol", "buck", "fs", "ts",
-          true, totalNumberOfRecords);
-      cfHandleRocksDbPersistentMap.get(snapdiffJobCFH)
-          .put(codecRegistry.asRawData(testJobId), snapshotDiffJob);
-      cfHandleRocksDbPersistentMap.get(snapdiffJobCFH)
-          .put(codecRegistry.asRawData(testJobId), snapshotDiffJob2);
-      if (pageSize <= 0 || startIdx < 0) {
-        Assertions.assertThrows(IllegalArgumentException.class,
-            () -> snapshotDiffManager.createPageResponse(snapshotDiffJob, 
"vol",
-            "buck", "fs", "ts", startIdx, pageSize));
-        return;
-      }
-      SnapshotDiffReportOzone snapshotDiffReportOzone =
-          snapshotDiffManager.createPageResponse(snapshotDiffJob, "vol",
-              "buck", "fs", "ts",
-              startIdx, pageSize);
-      int expectedTotalNumberOfRecords =
-          Math.max(Math.min(pageSize, totalNumberOfRecords - startIdx), 0);
-      Assertions.assertEquals(snapshotDiffReportOzone.getDiffList().size(),
-          expectedTotalNumberOfRecords);
-
-      int idx = startIdx;
-      for (SnapshotDiffReport.DiffReportEntry entry :
-          snapshotDiffReportOzone.getDiffList()) {
-        Assertions.assertEquals(getTestDiffEntry(testJobId, idx), entry);
-        idx++;
+  public void testCreatePageResponse(int startIdx,
+                                     int pageSize,
+                                     int totalNumberOfRecords)
+      throws IOException, RocksDBException {
+    String testJobId = "jobId";
+    String testJobId2 = "jobId2";
+
+    IntStream.range(0, totalNumberOfRecords).boxed().forEach(idx -> {
+      try {
+        db.get().put(snapDiffReportTable,
+            codecRegistry.asRawData(SnapshotDiffManager
+                .getReportKeyForIndex(testJobId, idx)),
+            codecRegistry.asRawData(getTestDiffEntry(testJobId, idx)));
+        db.get().put(snapDiffReportTable,
+            codecRegistry.asRawData(testJobId2 + DELIMITER + idx),
+            codecRegistry.asRawData(getTestDiffEntry(testJobId2, idx)));
+      } catch (IOException | RocksDBException e) {
+        throw new RuntimeException(e);
       }
+    });
+
+    SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0, testJobId,
+        JobStatus.DONE, "vol", "buck", "fs", "ts",
+        true, totalNumberOfRecords);
+
+    SnapshotDiffJob snapshotDiffJob2 = new SnapshotDiffJob(0, testJobId2,
+        JobStatus.DONE, "vol", "buck", "fs", "ts",
+        true, totalNumberOfRecords);
+
+    db.get().put(snapDiffJobTable,
+        codecRegistry.asRawData(testJobId),
+        codecRegistry.asRawData(snapshotDiffJob));
+
+    db.get().put(snapDiffJobTable,
+        codecRegistry.asRawData(testJobId2),
+        codecRegistry.asRawData(snapshotDiffJob2));
+
+    if (pageSize <= 0 || startIdx < 0) {
+      Assertions.assertThrows(IllegalArgumentException.class,
+          () -> snapshotDiffManager.createPageResponse(snapshotDiffJob, "vol",
+              "buck", "fs", "ts", startIdx, pageSize));
+      return;
+    }
+    SnapshotDiffReportOzone snapshotDiffReportOzone =
+        snapshotDiffManager.createPageResponse(snapshotDiffJob, "vol",
+            "buck", "fs", "ts",
+            startIdx, pageSize);
+    int expectedTotalNumberOfRecords =
+        Math.max(Math.min(pageSize, totalNumberOfRecords - startIdx), 0);
+    assertEquals(snapshotDiffReportOzone.getDiffList().size(),
+        expectedTotalNumberOfRecords);
+
+    int idx = startIdx;
+    for (DiffReportEntry entry : snapshotDiffReportOzone.getDiffList()) {
+      assertEquals(getTestDiffEntry(testJobId, idx), entry);
+      idx++;
     }
   }
 
   /**
    * Once a job is cancelled, it stays in the table until
    * SnapshotDiffCleanupService removes it.
-   *
    * Job response until that happens, is CANCELLED.
    */
   @Test
-  public void testGetSnapshotDiffReportForCancelledJob()
-      throws IOException {
-    SnapshotDiffManager snapshotDiffManager =
-        getMockedSnapshotDiffManager(10);
+  public void testGetSnapshotDiffReportForCancelledJob() throws IOException {
 
     String volumeName = "vol-" + RandomStringUtils.randomNumeric(5);
     String bucketName = "bucket-" + RandomStringUtils.randomNumeric(5);
@@ -732,55 +847,56 @@ public class TestSnapshotDiffManager {
     setUpSnapshots(volumeName, bucketName, fromSnapshotName,
         toSnapshotName, fromSnapshotUUID, toSnapshotUUID);
 
-    PersistentMap<String, SnapshotDiffJob> snapDiffJobTable =
+    PersistentMap<String, SnapshotDiffJob> snapDiffJobMap =
         snapshotDiffManager.getSnapDiffJobTable();
     String diffJobKey = fromSnapshotUUID + DELIMITER + toSnapshotUUID;
 
-    SnapshotDiffJob diffJob = snapDiffJobTable.get(diffJobKey);
+    SnapshotDiffJob diffJob = snapDiffJobMap.get(diffJobKey);
     Assertions.assertNull(diffJob);
 
+
+    SnapshotDiffManager spy = spy(snapshotDiffManager);
+    doNothing().when(spy).generateSnapshotDiffReport(eq(diffJobKey),
+        anyString(), eq(volumeName), eq(bucketName), eq(fromSnapshotName),
+        eq(toSnapshotName), eq(false));
+
     // Submit a new job.
-    SnapshotDiffResponse snapshotDiffResponse = snapshotDiffManager
-        .getSnapshotDiffReport(volumeName, bucketName,
-            fromSnapshotName, toSnapshotName,
-            0, 0, false);
+    SnapshotDiffResponse snapshotDiffResponse =
+        spy.getSnapshotDiffReport(volumeName, bucketName, fromSnapshotName,
+            toSnapshotName, 0, 0, false);
 
-    Assertions.assertEquals(JobStatus.IN_PROGRESS,
+    assertEquals(JobStatus.IN_PROGRESS,
         snapshotDiffResponse.getJobStatus());
 
     // Cancel the job.
-    snapshotDiffManager.cancelSnapshotDiff(volumeName, bucketName,
+    spy.cancelSnapshotDiff(volumeName, bucketName,
         fromSnapshotName, toSnapshotName);
 
     // Job status should be cancelled until the cleanup
     // service removes the job from the table.
-    snapshotDiffResponse = snapshotDiffManager
-        .getSnapshotDiffReport(volumeName, bucketName,
-            fromSnapshotName, toSnapshotName,
-            0, 0, false);
+    snapshotDiffResponse = spy.getSnapshotDiffReport(volumeName, bucketName,
+        fromSnapshotName, toSnapshotName, 0, 0, false);
 
-    Assertions.assertEquals(JobStatus.CANCELLED,
+    assertEquals(JobStatus.CANCELLED,
         snapshotDiffResponse.getJobStatus());
 
     // Check snapDiffJobTable.
-    diffJob = snapDiffJobTable.get(diffJobKey);
-    Assertions.assertNotNull(diffJob);
-    Assertions.assertEquals(JobStatus.CANCELLED,
+    diffJob = snapDiffJobMap.get(diffJobKey);
+    assertNotNull(diffJob);
+    assertEquals(JobStatus.CANCELLED,
         diffJob.getStatus());
 
     // Response should still be cancelled.
-    snapshotDiffResponse = snapshotDiffManager
-        .getSnapshotDiffReport(volumeName, bucketName,
-            fromSnapshotName, toSnapshotName,
-            0, 0, false);
+    snapshotDiffResponse = spy.getSnapshotDiffReport(volumeName, bucketName,
+        fromSnapshotName, toSnapshotName, 0, 0, false);
 
-    Assertions.assertEquals(JobStatus.CANCELLED,
+    assertEquals(JobStatus.CANCELLED,
         snapshotDiffResponse.getJobStatus());
 
     // Check snapDiffJobTable.
-    diffJob = snapDiffJobTable.get(diffJobKey);
-    Assertions.assertNotNull(diffJob);
-    Assertions.assertEquals(JobStatus.CANCELLED,
+    diffJob = snapDiffJobMap.get(diffJobKey);
+    assertNotNull(diffJob);
+    assertEquals(JobStatus.CANCELLED,
         diffJob.getStatus());
   }
 
@@ -807,8 +923,6 @@ public class TestSnapshotDiffManager {
                                             JobCancelResult cancelResult,
                                             boolean jobIsCancelled)
       throws IOException {
-    SnapshotDiffManager snapshotDiffManager =
-        getMockedSnapshotDiffManager(10);
 
     String volumeName = "vol-" + RandomStringUtils.randomNumeric(5);
     String bucketName = "bucket-" + RandomStringUtils.randomNumeric(5);
@@ -824,7 +938,7 @@ public class TestSnapshotDiffManager {
     setUpSnapshots(volumeName, bucketName, fromSnapshotName,
         toSnapshotName, fromSnapshotUUID, toSnapshotUUID);
 
-    PersistentMap<String, SnapshotDiffJob> snapDiffJobTable =
+    PersistentMap<String, SnapshotDiffJob> snapDiffJobMap =
         snapshotDiffManager.getSnapDiffJobTable();
     String diffJobKey = fromSnapshotUUID + DELIMITER + toSnapshotUUID;
 
@@ -833,27 +947,21 @@ public class TestSnapshotDiffManager {
         jobId, jobStatus, volumeName, bucketName,
         fromSnapshotName, toSnapshotName, true, 10);
 
-    snapDiffJobTable.put(diffJobKey, snapshotDiffJob);
+    snapDiffJobMap.put(diffJobKey, snapshotDiffJob);
 
     SnapshotDiffResponse snapshotDiffResponse = snapshotDiffManager
         .cancelSnapshotDiff(volumeName, bucketName,
             fromSnapshotName, toSnapshotName);
 
-    Assertions.assertEquals(cancelResult,
-        snapshotDiffResponse.getJobCancelResult());
+    assertEquals(cancelResult, snapshotDiffResponse.getJobCancelResult());
 
     if (jobIsCancelled) {
-      Assertions.assertEquals(JobStatus.CANCELLED,
-          snapshotDiffResponse.getJobStatus());
+      assertEquals(JobStatus.CANCELLED, snapshotDiffResponse.getJobStatus());
     }
   }
 
   @Test
-  public void testCancelNewSnapshotDiff()
-      throws IOException {
-    SnapshotDiffManager snapshotDiffManager =
-        getMockedSnapshotDiffManager(10);
-
+  public void testCancelNewSnapshotDiff() throws IOException {
     String volumeName = "vol-" + RandomStringUtils.randomNumeric(5);
     String bucketName = "bucket-" + RandomStringUtils.randomNumeric(5);
 
@@ -874,7 +982,7 @@ public class TestSnapshotDiffManager {
 
     // The job doesn't exist on the SnapDiffJob table and
     // trying to cancel it should lead to NEW_JOB cancel result.
-    Assertions.assertEquals(JobCancelResult.NEW_JOB,
+    assertEquals(JobCancelResult.NEW_JOB,
         snapshotDiffResponse.getJobCancelResult());
   }
 
@@ -897,48 +1005,46 @@ public class TestSnapshotDiffManager {
                                        boolean listAll,
                                        boolean containsJob)
       throws IOException {
-    SnapshotDiffManager snapshotDiffManager =
-        getMockedSnapshotDiffManager(10);
-
     String volumeName = "vol-" + RandomStringUtils.randomNumeric(5);
     String bucketName = "bucket-" + RandomStringUtils.randomNumeric(5);
-
     String fromSnapshotName = "snap-" + RandomStringUtils.randomNumeric(5);
     String toSnapshotName = "snap-" + RandomStringUtils.randomNumeric(5);
 
     UUID fromSnapshotUUID = UUID.randomUUID();
     UUID toSnapshotUUID = UUID.randomUUID();
 
-    setupMocksForRunningASnapDiff(volumeName, bucketName);
-
     setUpSnapshots(volumeName, bucketName, fromSnapshotName,
         toSnapshotName, fromSnapshotUUID, toSnapshotUUID);
 
-    PersistentMap<String, SnapshotDiffJob> snapDiffJobTable =
+    PersistentMap<String, SnapshotDiffJob> snapDiffJobMap =
         snapshotDiffManager.getSnapDiffJobTable();
     String diffJobKey = fromSnapshotUUID + DELIMITER + toSnapshotUUID;
 
-    SnapshotDiffJob diffJob = snapDiffJobTable.get(diffJobKey);
-    Assertions.assertNull(diffJob);
+    SnapshotDiffJob diffJob = snapDiffJobMap.get(diffJobKey);
+    assertNull(diffJob);
 
     // There are no jobs in the table, therefore
     // the response list should be empty.
     List<SnapshotDiffJob> jobList = snapshotDiffManager
         .getSnapshotDiffJobList(volumeName, bucketName, jobStatus, listAll);
-    Assertions.assertTrue(jobList.isEmpty());
+    assertTrue(jobList.isEmpty());
+
+    SnapshotDiffManager spy = spy(snapshotDiffManager);
+    doNothing().when(spy).generateSnapshotDiffReport(eq(diffJobKey),
+        anyString(), eq(volumeName), eq(bucketName), eq(fromSnapshotName),
+        eq(toSnapshotName), eq(false));
 
     // SnapshotDiffReport
-    SnapshotDiffResponse snapshotDiffResponse = snapshotDiffManager
-        .getSnapshotDiffReport(volumeName, bucketName,
-            fromSnapshotName, toSnapshotName,
-            0, 0, false);
+    SnapshotDiffResponse snapshotDiffResponse =
+        spy.getSnapshotDiffReport(volumeName, bucketName, fromSnapshotName,
+            toSnapshotName, 0, 0, false);
 
-    Assertions.assertEquals(SnapshotDiffResponse.JobStatus.IN_PROGRESS,
+    assertEquals(SnapshotDiffResponse.JobStatus.IN_PROGRESS,
         snapshotDiffResponse.getJobStatus());
 
-    diffJob = snapDiffJobTable.get(diffJobKey);
-    Assertions.assertNotNull(diffJob);
-    Assertions.assertEquals(SnapshotDiffResponse.JobStatus.IN_PROGRESS,
+    diffJob = snapDiffJobMap.get(diffJobKey);
+    assertNotNull(diffJob);
+    assertEquals(SnapshotDiffResponse.JobStatus.IN_PROGRESS,
         diffJob.getStatus());
 
     jobList = snapshotDiffManager
@@ -949,117 +1055,314 @@ public class TestSnapshotDiffManager {
     // there should be a response.
     // Otherwise, response list should be empty.
     if (containsJob) {
-      Assertions.assertTrue(jobList.contains(diffJob));
+      assertTrue(jobList.contains(diffJob));
     } else {
-      Assertions.assertTrue(jobList.isEmpty());
+      assertTrue(jobList.isEmpty());
     }
   }
 
   @Test
   public void testListSnapDiffWithInvalidStatus() throws IOException {
-    SnapshotDiffManager snapshotDiffManager =
-        getMockedSnapshotDiffManager(10);
-
     String volumeName = "vol-" + RandomStringUtils.randomNumeric(5);
     String bucketName = "bucket-" + RandomStringUtils.randomNumeric(5);
-
     String fromSnapshotName = "snap-" + RandomStringUtils.randomNumeric(5);
     String toSnapshotName = "snap-" + RandomStringUtils.randomNumeric(5);
 
     UUID fromSnapshotUUID = UUID.randomUUID();
     UUID toSnapshotUUID = UUID.randomUUID();
 
-    setupMocksForRunningASnapDiff(volumeName, bucketName);
-
     setUpSnapshots(volumeName, bucketName, fromSnapshotName,
         toSnapshotName, fromSnapshotUUID, toSnapshotUUID);
 
-    // SnapshotDiffReport
-    snapshotDiffManager.getSnapshotDiffReport(volumeName, bucketName,
-        fromSnapshotName, toSnapshotName,
-        0, 0, false);
+    String diffJobKey = fromSnapshotUUID + DELIMITER + toSnapshotUUID;
+    SnapshotDiffManager spy = spy(snapshotDiffManager);
+
+    doNothing().when(spy).generateSnapshotDiffReport(eq(diffJobKey),
+        anyString(), eq(volumeName), eq(bucketName), eq(fromSnapshotName),
+        eq(toSnapshotName), eq(false));
+
+    spy.getSnapshotDiffReport(volumeName, bucketName, fromSnapshotName,
+            toSnapshotName, 0, 0, false);
 
     // Invalid status, without listAll true, results in an exception.
-    Assertions.assertThrows(IOException.class, () -> snapshotDiffManager
+    assertThrows(IOException.class, () -> snapshotDiffManager
         .getSnapshotDiffJobList(volumeName, bucketName, "invalid", false));
   }
 
+  @Test
+  public void testGenerateDiffReportWhenThereInEntry() {
+    PersistentMap<byte[], SnapshotDiffObject> objectIdToDiffObject =
+        new StubbedPersistentMap<>();
+    PersistentMap<byte[], byte[]> oldObjIdToKeyMap =
+        new StubbedPersistentMap<>();
+    PersistentMap<byte[], byte[]> newObjIdToKeyMap =
+        new StubbedPersistentMap<>();
+
+    long totalDiffEntries = snapshotDiffManager.generateDiffReport("jobId",
+        keyInfoTable,
+        keyInfoTable,
+        objectIdToDiffObject,
+        oldObjIdToKeyMap,
+        newObjIdToKeyMap,
+        "volume",
+        "bucket",
+        "fromSnapshot",
+        "toSnapshot",
+        false,
+        Optional.empty(),
+        Optional.empty());
+
+    assertEquals(0, totalDiffEntries);
+  }
+
+  @Test
+  public void testGenerateDiffReportFailure() throws IOException {
+    String volumeName = "vol-" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket-" + RandomStringUtils.randomNumeric(5);
+    String fromSnapshotName = "snap-" + RandomStringUtils.randomNumeric(5);
+    String toSnapshotName = "snap-" + RandomStringUtils.randomNumeric(5);
+
+    PersistentMap<byte[], SnapshotDiffObject> objectIdToDiffObject =
+        new SnapshotTestUtils.StubbedPersistentMap<>();
+    PersistentMap<byte[], byte[]> oldObjIdToKeyMap =
+        new StubbedPersistentMap<>();
+    PersistentMap<byte[], byte[]> newObjIdToKeyMap =
+        new StubbedPersistentMap<>();
+    objectIdToDiffObject.put(codecRegistry.asRawData("randomKey"),
+        new SnapshotDiffObjectBuilder(1L).build());
+
+    SnapshotDiffManager spy = spy(snapshotDiffManager);
+    doReturn(true).when(spy)
+        .areDiffJobAndSnapshotsActive(volumeName, bucketName,
+            fromSnapshotName, toSnapshotName);
+
+    IllegalStateException exception = assertThrows(IllegalStateException.class,
+        () -> spy.generateDiffReport("jobId",
+            keyInfoTable,
+            keyInfoTable,
+            objectIdToDiffObject,
+            oldObjIdToKeyMap,
+            newObjIdToKeyMap,
+            volumeName,
+            bucketName,
+            fromSnapshotName,
+            toSnapshotName,
+            false,
+            Optional.empty(),
+            Optional.empty())
+    );
+    assertEquals("Old and new key name both are null",
+        exception.getMessage());
+  }
+
+  /**
+   * Tests that IN_PROGRESS jobs are submitted to the executor on the service
+   * startup.
+   */
+  @Test
+  public void testLoadJobsOnStartUp() throws Exception {
+    for (int i = 0; i < snapshotInfoList.size(); i++) {
+      uploadSnapshotDiffJobToDb(snapshotInfo, snapshotInfoList.get(i),
+          snapDiffJobs.get(i));
+    }
+
+    SnapshotDiffManager spy = spy(snapshotDiffManager);
+
+    doAnswer(invocation -> {
+          SnapshotDiffJob diffJob = getSnapshotDiffJobFromDb(snapshotInfo,
+              snapshotInfoList.get(1));
+          diffJob.setTotalDiffEntries(1L);
+          diffJob.setStatus(DONE);
+          uploadSnapshotDiffJobToDb(snapshotInfo,
+              snapshotInfoList.get(1),
+              diffJob);
+          return null;
+        }
+    ).when(spy).generateSnapshotDiffReport(anyString(), anyString(),
+        eq(VOLUME_NAME), eq(BUCKET_NAME), eq(snapshotInfo.getName()),
+        eq(snapshotInfoList.get(1).getName()), eq(false));
+
+    spy.loadJobsOnStartUp();
+
+    // Wait for sometime to make sure that job finishes.
+    Thread.sleep(1000L);
+
+    SnapshotDiffJob snapDiffJob = getSnapshotDiffJobFromDb(snapshotInfo,
+        snapshotInfoList.get(1));
+
+    assertEquals(DONE, snapDiffJob.getStatus());
+    assertEquals(1L, snapDiffJob.getTotalDiffEntries());
+  }
+
+  private SnapshotDiffJob getSnapshotDiffJobFromDb(SnapshotInfo fromSnapshot,
+                                                   SnapshotInfo toSnapshot)
+      throws IOException, RocksDBException {
+    String jobKey = generateSnapDiffJobKey.apply(fromSnapshot, toSnapshot);
+
+    byte[] bytes = db.get()
+        .get(snapDiffJobTable, codecRegistry.asRawData(jobKey));
+    return codecRegistry.asObject(bytes, SnapshotDiffJob.class);
+  }
+
+  private void uploadSnapshotDiffJobToDb(SnapshotInfo fromSnapshot,
+                                         SnapshotInfo toSnapshot,
+                                         SnapshotDiffJob diffJob)
+      throws IOException, RocksDBException {
+    String jobKey = generateSnapDiffJobKey.apply(fromSnapshot, toSnapshot);
+
+    byte[] keyBytes = codecRegistry.asRawData(jobKey);
+    byte[] jobBytes = codecRegistry.asRawData(diffJob);
+    db.get().put(snapDiffJobTable, keyBytes, jobBytes);
+  }
+
+  private static Stream<Arguments> threadPoolFullScenarios() {
+    return Stream.of(
+        Arguments.of("When there is a wait time between job batches",
+            500L, 45, 0),
+        Arguments.of("When there is no wait time between job batches",
+            0L, 20, 25)
+    );
+  }
+
+  @ParameterizedTest(name = "{0}")
+  @MethodSource("threadPoolFullScenarios")
+  public void testThreadPoolIsFull(String description,
+                                   long waitBetweenBatches,
+                                   int expectInProgressJobsCount,
+                                   int expectRejectedJobsCount)
+      throws Exception {
+    ExecutorService executorService = new ThreadPoolExecutor(100, 100, 0,
+        TimeUnit.MILLISECONDS, new SynchronousQueue<>()
+    );
+
+    List<SnapshotInfo> snapshotInfos = new ArrayList<>();
+
+    for (int i = 0; i < 10; i++) {
+      UUID snapshotId = UUID.randomUUID();
+      String snapshotName = "snap-" + snapshotId;
+      SnapshotInfo snapInfo = new SnapshotInfo.Builder()
+          .setSnapshotId(snapshotId)
+          .setVolumeName(VOLUME_NAME)
+          .setBucketName(BUCKET_NAME)
+          .setName(snapshotName)
+          .setSnapshotPath("fromSnapshotPath")
+          .setCheckpointDir("fromSnapshotCheckpointDir")
+          .build();
+      snapshotInfos.add(snapInfo);
+
+      when(snapshotInfoTable.get(getTableKey(VOLUME_NAME, BUCKET_NAME,
+          snapshotName))).thenReturn(snapInfo);
+    }
+
+    SnapshotDiffManager spy = spy(snapshotDiffManager);
+
+    for (int i = 0; i < snapshotInfos.size(); i++) {
+      for (int j = i + 1; j < snapshotInfos.size(); j++) {
+        String fromSnapshotName = snapshotInfos.get(i).getName();
+        String toSnapshotName = snapshotInfos.get(j).getName();
+
+        doAnswer(invocation -> {
+          Thread.sleep(250L);
+          return null;
+        }).when(spy).generateSnapshotDiffReport(anyString(), anyString(),
+            eq(VOLUME_NAME), eq(BUCKET_NAME), eq(fromSnapshotName),
+            eq(toSnapshotName), eq(false));
+      }
+    }
+
+    List<Future<SnapshotDiffResponse>> futures = new ArrayList<>();
+    for (int i = 0; i < snapshotInfos.size(); i++) {
+      for (int j = i + 1; j < snapshotInfos.size(); j++) {
+        String fromSnapshotName = snapshotInfos.get(i).getName();
+        String toSnapshotName = snapshotInfos.get(j).getName();
+
+        Future<SnapshotDiffResponse> future = executorService.submit(
+            () -> submitJob(spy, fromSnapshotName, toSnapshotName));
+        futures.add(future);
+      }
+      Thread.sleep(waitBetweenBatches);
+    }
+
+    // Wait to make sure that all jobs finish before assertion.
+    Thread.sleep(1000L);
+    int inProgressJobsCount = 0;
+    int rejectedJobsCount = 0;
+
+    for (Future<SnapshotDiffResponse> future : futures) {
+      SnapshotDiffResponse response = future.get();
+      if (response.getJobStatus() == IN_PROGRESS) {
+        inProgressJobsCount++;
+      } else if (response.getJobStatus() == REJECTED) {
+        rejectedJobsCount++;
+      } else {
+        throw new IllegalStateException("Unexpected job status.");
+      }
+    }
+
+    assertEquals(expectInProgressJobsCount, inProgressJobsCount);
+    assertEquals(expectRejectedJobsCount, rejectedJobsCount);
+
+    int notFoundJobs = 0;
+    for (int i = 0; i < snapshotInfos.size(); i++) {
+      for (int j = i + 1; j < snapshotInfos.size(); j++) {
+        SnapshotDiffJob diffJob =
+            getSnapshotDiffJobFromDb(snapshotInfos.get(i),
+                snapshotInfos.get(j));
+        if (diffJob == null) {
+          notFoundJobs++;
+        }
+      }
+    }
+
+    // assert that rejected jobs were removed from the job table as well.
+    assertEquals(expectRejectedJobsCount, notFoundJobs);
+    executorService.shutdown();
+  }
+
+  private SnapshotDiffResponse submitJob(SnapshotDiffManager diffManager,
+                                         String fromSnapshotName,
+                                         String toSnapshotName) {
+    try {
+      return diffManager.getSnapshotDiffReport(VOLUME_NAME, BUCKET_NAME,
+          fromSnapshotName, toSnapshotName, 0, 1000, false);
+    } catch (IOException exception) {
+      throw new RuntimeException(exception);
+    }
+  }
+
   private void setUpSnapshots(String volumeName, String bucketName,
                               String fromSnapshotName, String toSnapshotName,
                               UUID fromSnapshotUUID, UUID toSnapshotUUID)
       throws IOException {
-    try (MockedStatic<SnapshotUtils> mockedSnapUtils =
-             Mockito.mockStatic(SnapshotUtils.class)) {
-      // Create 1st snapshot.
-      SnapshotInfo fromSnapshotInfo =
-          getSnapshotInfoInstance(volumeName, bucketName,
-              fromSnapshotName, fromSnapshotUUID);
-      mockedSnapUtils.when(() -> SnapshotUtils
-              .getSnapshotInfo(ozoneManager, volumeName,
-                  bucketName, fromSnapshotName))
-          .thenReturn(fromSnapshotInfo);
-
-      String fromSnapKey = SnapshotInfo
-          .getTableKey(fromSnapshotInfo.getVolumeName(),
-              fromSnapshotInfo.getBucketName(), fromSnapshotInfo.getName());
-
-      Mockito.when(ozoneManager.getMetadataManager()
-              .getSnapshotInfoTable().get(fromSnapKey))
-          .thenReturn(fromSnapshotInfo);
-
-      mockedSnapUtils.when(() -> SnapshotUtils
-              .getSnapshotInfo(ozoneManager, fromSnapKey))
-          .thenReturn(fromSnapshotInfo);
-
-      OmSnapshot omSnapshotFrom = getMockedOmSnapshot(fromSnapKey);
-      snapshotCache.put(fromSnapKey, omSnapshotFrom);
-
-      // Create 2nd snapshot.
-      SnapshotInfo toSnapshotInfo =
-          getSnapshotInfoInstance(volumeName, bucketName,
-              toSnapshotName, toSnapshotUUID);
-
-      mockedSnapUtils.when(
-              () -> SnapshotUtils.getSnapshotInfo(ozoneManager,
-                  volumeName, bucketName, toSnapshotName))
-          .thenReturn(toSnapshotInfo);
-
-      String toSnapKey = SnapshotInfo
-          .getTableKey(toSnapshotInfo.getVolumeName(),
-              toSnapshotInfo.getBucketName(), toSnapshotInfo.getName());
-
-      Mockito.when(ozoneManager.getMetadataManager()
-          .getSnapshotInfoTable().get(toSnapKey)).thenReturn(toSnapshotInfo);
-
-      mockedSnapUtils.when(() -> SnapshotUtils
-              .getSnapshotInfo(ozoneManager, toSnapKey))
-          .thenReturn(toSnapshotInfo);
-
-      OmSnapshot omSnapshotTo = getMockedOmSnapshot(toSnapKey);
-      snapshotCache.put(toSnapKey, omSnapshotTo);
-    }
+
+
+    SnapshotInfo fromSnapshotInfo =
+        getSnapshotInfoInstance(volumeName, bucketName,
+            fromSnapshotName, fromSnapshotUUID);
+    SnapshotInfo toSnapshotInfo =
+        getSnapshotInfoInstance(volumeName, bucketName,
+            toSnapshotName, toSnapshotUUID);
+
+    String fromSnapKey = getTableKey(volumeName, bucketName, fromSnapshotName);
+    String toSnapKey = getTableKey(volumeName, bucketName, toSnapshotName);
+
+    when(snapshotInfoTable.get(fromSnapKey)).thenReturn(fromSnapshotInfo);
+    when(snapshotInfoTable.get(toSnapKey)).thenReturn(toSnapshotInfo);
   }
 
-  private SnapshotInfo getSnapshotInfoInstance(
-      String volumeName, String bucketName,
-      String snapshotName, UUID snapshotUUID) {
-    SnapshotInfo snapshotInfo = SnapshotInfo
-        .newInstance(volumeName, bucketName,
-            snapshotName, snapshotUUID,
-            System.currentTimeMillis());
-    snapshotInfo.setSnapshotStatus(SnapshotInfo
-        .SnapshotStatus.SNAPSHOT_ACTIVE);
-    return snapshotInfo;
+  private SnapshotInfo getSnapshotInfoInstance(String volumeName,
+                                               String bucketName,
+                                               String snapshotName,
+                                               UUID snapshotUUID) {
+    SnapshotInfo info = SnapshotInfo.newInstance(volumeName, bucketName,
+        snapshotName, snapshotUUID, System.currentTimeMillis());
+    info.setSnapshotStatus(SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE);
+    return info;
   }
 
   private void setupMocksForRunningASnapDiff(
       String volumeName, String bucketName)
       throws IOException {
-    Mockito.when(ozoneManager.getMetadataManager().getSnapshotInfoTable())
-        .thenReturn(Mockito.mock(Table.class));
-    Mockito.when(ozoneManager.getMetadataManager().getBucketTable())
-        .thenReturn(Mockito.mock(Table.class));
-
     Map<BucketLayout, String> keyTableMap = new HashMap<>();
     keyTableMap.put(BucketLayout.FILE_SYSTEM_OPTIMIZED,
         OmMetadataManagerImpl.FILE_TABLE);
@@ -1069,11 +1372,9 @@ public class TestSnapshotDiffManager {
         OmMetadataManagerImpl.KEY_TABLE);
 
     for (Map.Entry<BucketLayout, String> entry : keyTableMap.entrySet()) {
-      Mockito.when(ozoneManager.getMetadataManager()
-              .getKeyTable(entry.getKey()))
-          .thenReturn(Mockito.mock(Table.class));
-      Mockito.when(ozoneManager.getMetadataManager()
-              .getKeyTable(entry.getKey()).getName())
+      when(omMetadataManager.getKeyTable(entry.getKey()))
+          .thenReturn(keyInfoTable);
+      when(omMetadataManager.getKeyTable(entry.getKey()).getName())
           .thenReturn(entry.getValue());
     }
 
@@ -1085,9 +1386,89 @@ public class TestSnapshotDiffManager {
         .setOwner(ugi.getShortUserName())
         .build();
 
-    String bucketKey = ozoneManager.getMetadataManager()
-        .getBucketKey(volumeName, bucketName);
-    Mockito.when(ozoneManager.getMetadataManager().getBucketTable()
-        .get(bucketKey)).thenReturn(bucketInfo);
+    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+    when(bucketInfoTable.get(bucketKey)).thenReturn(bucketInfo);
+  }
+
+  @Test
+  public void testGetSnapshotDiffReportHappyCase() throws Exception {
+    SnapshotInfo fromSnapInfo = snapshotInfo;
+    SnapshotInfo toSnapInfo = snapshotInfoList.get(0);
+
+    Set<String> testDeltaFiles = new HashSet<>();
+
+    SnapshotDiffManager spy = spy(snapshotDiffManager);
+
+    doReturn(testDeltaFiles).when(spy).getDeltaFiles(any(OmSnapshot.class),
+        any(OmSnapshot.class), anyList(), eq(fromSnapInfo), eq(toSnapInfo),
+        eq(false), anyMap(), anyString());
+
+    doReturn(testDeltaFiles).when(spy)
+        .getSSTFileListForSnapshot(any(OmSnapshot.class), anyList());
+
+    doNothing().when(spy).addToObjectIdMap(eq(keyInfoTable), eq(keyInfoTable),
+        any(), anyBoolean(), any(), any(), any(), any(), any(), anyMap());
+    doNothing().when(spy).checkReportsIntegrity(any(), anyInt(), anyInt());
+
+    doReturn(10L).when(spy).generateDiffReport(anyString(),
+        any(), any(), any(), any(), any(), anyString(), anyString(),
+        anyString(), anyString(), anyBoolean(), any(), any());
+    doReturn(LEGACY).when(spy).getBucketLayout(VOLUME_NAME, BUCKET_NAME,
+        omMetadataManager);
+
+    spy.getSnapshotDiffReport(VOLUME_NAME, BUCKET_NAME, fromSnapInfo.getName(),
+        toSnapInfo.getName(), 0, 1000, false);
+
+    Thread.sleep(1000L);
+    spy.getSnapshotDiffReport(VOLUME_NAME, BUCKET_NAME, fromSnapInfo.getName(),
+        toSnapInfo.getName(), 0, 1000, false);
+
+    SnapshotDiffJob snapDiffJob = getSnapshotDiffJobFromDb(fromSnapInfo,
+        toSnapInfo);
+    assertEquals(DONE, snapDiffJob.getStatus());
+    assertEquals(10L, snapDiffJob.getTotalDiffEntries());
+  }
+
+  /**
+   * Tests that only QUEUED jobs are submitted to the executor and rest are
+   * short-circuited based on previous one.
+   */
+  @Disabled
+  @Test
+  public void testGetSnapshotDiffReportJob() throws Exception {
+    for (int i = 0; i < jobStatuses.size(); i++) {
+      uploadSnapshotDiffJobToDb(snapshotInfo, snapshotInfoList.get(i),
+          snapDiffJobs.get(i));
+    }
+
+    SnapshotDiffManager spy = spy(snapshotDiffManager);
+
+    doAnswer(invocation -> {
+          SnapshotDiffJob diffJob = getSnapshotDiffJobFromDb(snapshotInfo,
+              snapshotInfoList.get(0));
+          diffJob.setTotalDiffEntries(1L);
+          diffJob.setStatus(DONE);
+          uploadSnapshotDiffJobToDb(snapshotInfo,
+              snapshotInfoList.get(0),
+              diffJob);
+          return null;
+        }
+    ).when(spy).generateSnapshotDiffReport(anyString(), anyString(),
+        eq(VOLUME_NAME), eq(BUCKET_NAME), eq(snapshotInfo.getName()),
+        eq(snapshotInfoList.get(0).getName()), eq(false));
+
+    for (int i = 0; i < snapshotInfoList.size(); i++) {
+      SnapshotDiffResponse snapshotDiffReport =
+          spy.getSnapshotDiffReport(VOLUME_NAME, BUCKET_NAME,
+              snapshotInfo.getName(), snapshotInfoList.get(i).getName(), 0,
+              1000,
+              false);
+      SnapshotDiffJob diffJob = snapDiffJobs.get(i);
+      if (diffJob.getStatus() == QUEUED) {
+        assertEquals(IN_PROGRESS, snapshotDiffReport.getJobStatus());
+      } else {
+        assertEquals(diffJob.getStatus(), snapshotDiffReport.getJobStatus());
+      }
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to