This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch HDDS-6517-Snapshot
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-6517-Snapshot by this 
push:
     new 2554ec26d7 HDDS-7508. [Snapshot] Finish TestOMSnapshotDAG, restore 
CompactionNode SnapshotID field for debugging (#3981)
2554ec26d7 is described below

commit 2554ec26d772233d6a2b98810583d1876606967b
Author: Siyao Meng <[email protected]>
AuthorDate: Tue Dec 6 16:00:45 2022 -0800

    HDDS-7508. [Snapshot] Finish TestOMSnapshotDAG, restore CompactionNode 
SnapshotID field for debugging (#3981)
---
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   5 +
 .../common/src/main/resources/ozone-default.xml    |   9 +
 .../hadoop/hdds/utils/db/DBStoreBuilder.java       |  26 +-
 hadoop-hdds/rocksdb-checkpoint-differ/pom.xml      |   4 +
 .../org/apache/ozone/rocksdiff/CompactionNode.java |  80 ++++
 .../apache/ozone/rocksdiff/DifferSnapshotInfo.java |  63 +++
 .../ozone/rocksdiff/RocksDBCheckpointDiffer.java   | 388 +++++++----------
 .../rocksdiff/TestRocksDBCheckpointDiffer.java     | 482 ++++++++++++++-------
 .../hadoop/ozone/freon/TestOMSnapshotDAG.java      |  85 ++--
 .../apache/hadoop/ozone/om/OmSnapshotManager.java  |   3 +-
 10 files changed, 715 insertions(+), 430 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 4293f3ef1e..74ed487e8c 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -99,6 +99,11 @@ public final class OzoneConfigKeys {
   public static final String OZONE_METADATA_STORE_ROCKSDB_STATISTICS_OFF =
       "OFF";
 
+  public static final String OZONE_METADATA_STORE_ROCKSDB_CF_WRITE_BUFFER_SIZE 
=
+      "ozone.metastore.rocksdb.cf.write.buffer.size";
+  public static final String
+      OZONE_METADATA_STORE_ROCKSDB_CF_WRITE_BUFFER_SIZE_DEFAULT = "128MB";
+
   public static final String OZONE_UNSAFEBYTEOPERATIONS_ENABLED =
       "ozone.UnsafeByteOperations.enabled";
   public static final boolean OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 788bc0e8a5..3e8499bf71 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -669,6 +669,15 @@
       Check the rocksdb documentation for more details.
     </description>
   </property>
+  <property>
+    <name>ozone.metastore.rocksdb.cf.write.buffer.size</name>
+    <value>128MB</value>
+    <tag>OZONE, OM, SCM, STORAGE, PERFORMANCE</tag>
+    <description>
+      The write buffer (memtable) size for each column family of the rocksdb
+      store. Check the rocksdb documentation for more details.
+    </description>
+  </property>
   <property>
     <name>ozone.scm.db.dirs</name>
     <value/>
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
index 9f94e1d2c5..5aad1aa526 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
@@ -39,11 +39,14 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import com.google.common.base.Preconditions;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE;
 import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_CF_WRITE_BUFFER_SIZE;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_CF_WRITE_BUFFER_SIZE_DEFAULT;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS_DEFAULT;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS_OFF;
 import static org.rocksdb.RocksDB.DEFAULT_COLUMN_FAMILY;
 
+import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
@@ -87,6 +90,8 @@ public final class DBStoreBuilder {
   private ConfigurationSource configuration;
   private CodecRegistry registry;
   private String rocksDbStat;
+  // RocksDB column family write buffer size
+  private long rocksDbCfWriteBufferSize;
   private RocksDBConfiguration rocksDBConfiguration;
   // Flag to indicate if the RocksDB should be opened readonly.
   private boolean openReadOnly = false;
@@ -129,6 +134,10 @@ public final class DBStoreBuilder {
     this.rocksDbStat = configuration.getTrimmed(
         OZONE_METADATA_STORE_ROCKSDB_STATISTICS,
         OZONE_METADATA_STORE_ROCKSDB_STATISTICS_DEFAULT);
+    this.rocksDbCfWriteBufferSize = (long) configuration.getStorageSize(
+        OZONE_METADATA_STORE_ROCKSDB_CF_WRITE_BUFFER_SIZE,
+        OZONE_METADATA_STORE_ROCKSDB_CF_WRITE_BUFFER_SIZE_DEFAULT,
+        StorageUnit.BYTES);
     this.rocksDBConfiguration = rocksDBConfiguration;
 
     // Get default DBOptions and ColumnFamilyOptions from the default DB
@@ -275,7 +284,7 @@ public final class DBStoreBuilder {
 
     // If default column family was not added, add it with the default options.
     cfOptions.putIfAbsent(DEFAULT_COLUMN_FAMILY_NAME,
-        getDefaultCfOptions());
+            getCfOptions(rocksDbCfWriteBufferSize));
 
     for (Map.Entry<String, ManagedColumnFamilyOptions> entry:
         cfOptions.entrySet()) {
@@ -284,7 +293,8 @@ public final class DBStoreBuilder {
 
       if (options == null) {
         LOG.debug("using default column family options for table: {}", name);
-        tableConfigs.add(new TableConfig(name, getDefaultCfOptions()));
+        tableConfigs.add(new TableConfig(name,
+                getCfOptions(rocksDbCfWriteBufferSize)));
       } else {
         tableConfigs.add(new TableConfig(name, options));
       }
@@ -298,6 +308,18 @@ public final class DBStoreBuilder {
         .orElseGet(defaultCfProfile::getColumnFamilyOptions);
   }
 
+  /**
+   * Get default column family options, but with column family write buffer
+   * size limit overridden.
+   * @param writeBufferSize Specify column family write buffer size.
+   * @return ManagedColumnFamilyOptions
+   */
+  private ManagedColumnFamilyOptions getCfOptions(long writeBufferSize) {
+    ManagedColumnFamilyOptions cfOpts = getDefaultCfOptions();
+    cfOpts.setWriteBufferSize(writeBufferSize);
+    return cfOpts;
+  }
+
   /**
    * Attempts to get RocksDB {@link ManagedDBOptions} from an ini config
    * file. If that file does not exist, the value of {@code defaultDBOptions}
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml 
b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
index c6af10fbc4..b288364f73 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
@@ -85,6 +85,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd";>
       <groupId>org.apache.ozone</groupId>
       <artifactId>hdds-test-utils</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-params</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/CompactionNode.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/CompactionNode.java
new file mode 100644
index 0000000000..a7cfa27aaf
--- /dev/null
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/CompactionNode.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ozone.rocksdiff;
+
+/**
+ * Node in the compaction DAG that represents an SST file.
+ */
+public class CompactionNode {
+  // Name of the SST file
+  private final String fileName;
+  // The last snapshot created before this node came into existence
+  private final String snapshotId;
+  private final long snapshotGeneration;
+  private final long totalNumberOfKeys;
+  private long cumulativeKeysReverseTraversal;
+
+  /**
+   * CompactionNode constructor.
+   * @param file SST file (filename without extension)
+   * @param ssId snapshotId field. Added here for improved debuggability only
+   * @param numKeys Number of keys in the SST
+   * @param seqNum Snapshot generation (sequence number)
+   */
+  public CompactionNode(String file, String ssId, long numKeys, long seqNum) {
+    fileName = file;
+    snapshotId = ssId;
+    totalNumberOfKeys = numKeys;
+    snapshotGeneration = seqNum;
+    cumulativeKeysReverseTraversal = 0L;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Node{%s}", fileName);
+  }
+
+  public String getFileName() {
+    return fileName;
+  }
+
+  public String getSnapshotId() {
+    return snapshotId;
+  }
+
+  public long getSnapshotGeneration() {
+    return snapshotGeneration;
+  }
+
+  public long getTotalNumberOfKeys() {
+    return totalNumberOfKeys;
+  }
+
+  public long getCumulativeKeysReverseTraversal() {
+    return cumulativeKeysReverseTraversal;
+  }
+
+  public void setCumulativeKeysReverseTraversal(
+      long cumulativeKeysReverseTraversal) {
+    this.cumulativeKeysReverseTraversal = cumulativeKeysReverseTraversal;
+  }
+
+  public void addCumulativeKeysReverseTraversal(long diff) {
+    this.cumulativeKeysReverseTraversal += diff;
+  }
+}
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/DifferSnapshotInfo.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/DifferSnapshotInfo.java
new file mode 100644
index 0000000000..fb82bd280f
--- /dev/null
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/DifferSnapshotInfo.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ozone.rocksdiff;
+
+import java.util.Map;
+
+/**
+ * Snapshot information node class for the differ.
+ */
+public class DifferSnapshotInfo {
+  private final String dbPath;
+  private final String snapshotID;
+  private final long snapshotGeneration;
+
+  private final Map<String, String> tablePrefixes;
+
+  public DifferSnapshotInfo(String db, String id, long gen,
+                            Map<String, String> prefixes) {
+    dbPath = db;
+    snapshotID = id;
+    snapshotGeneration = gen;
+    tablePrefixes = prefixes;
+  }
+
+  public String getDbPath() {
+    return dbPath;
+  }
+
+  public String getSnapshotID() {
+    return snapshotID;
+  }
+
+  public long getSnapshotGeneration() {
+    return snapshotGeneration;
+  }
+
+  public Map<String, String> getTablePrefixes() {
+    return tablePrefixes;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("DifferSnapshotInfo{dbPath='%s', snapshotID='%s', " +
+                    "snapshotGeneration=%d, tablePrefixes size=%s}",
+            dbPath, snapshotID, snapshotGeneration, tablePrefixes.size());
+  }
+
+}
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index 4ad8a0f9b2..7a8ade9f5f 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -18,6 +18,7 @@
 package org.apache.ozone.rocksdiff;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.graph.GraphBuilder;
 import com.google.common.graph.MutableGraph;
 import org.apache.commons.lang3.StringUtils;
@@ -42,6 +43,7 @@ import java.io.BufferedWriter;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -49,10 +51,9 @@ import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
+import java.util.Scanner;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
@@ -132,12 +133,20 @@ public class RocksDBCheckpointDiffer {
   private static final int LONG_MAX_STRLEN =
       String.valueOf(Long.MAX_VALUE).length();
 
+  /**
+   * Used during DAG reconstruction.
+   */
   private long reconstructionSnapshotGeneration;
+  private String reconstructionLastSnapshotID;
 
   /**
    * Dummy object that acts as a write lock in compaction listener.
    */
   private final Object compactionListenerWriteLock = new Object();
+  /**
+   * Flag for testing. Skips SST file summary reader.
+   */
+  private boolean skipGetSSTFileSummary = false;
 
   /**
    * Constructor.
@@ -167,6 +176,16 @@ public class RocksDBCheckpointDiffer {
     this.activeDBLocationStr = activeDBLocation.toString() + "/";
   }
 
+  /**
+   * This constructor is only meant for unit testing.
+   */
+  @VisibleForTesting
+  RocksDBCheckpointDiffer() {
+    this.skipGetSSTFileSummary = true;
+    this.sstBackupDir = null;
+    this.activeDBLocationStr = null;
+  }
+
   private void setCompactionLogDir(String metadataDir,
       String compactionLogDirName) {
 
@@ -232,31 +251,6 @@ public class RocksDBCheckpointDiffer {
     appendToCurrentCompactionLog("");
   }
 
-  // Node in the DAG to represent an SST file
-  private static class CompactionNode {
-    // Name of the SST file
-    private final String fileName;
-    // The last snapshot created before this node came into existence
-    private final String snapshotId;
-    private final long snapshotGeneration;
-    private final long totalNumberOfKeys;
-    private long cumulativeKeysReverseTraversal;
-
-    CompactionNode(String file, String ssId, long numKeys, long seqNum) {
-      fileName = file;
-      // Retained for debuggability. Unused for now.
-      snapshotId = ssId;
-      totalNumberOfKeys = numKeys;
-      snapshotGeneration = seqNum;
-      cumulativeKeysReverseTraversal = 0L;
-    }
-
-    @Override
-    public String toString() {
-      return String.format("Node{%s}", fileName);
-    }
-  }
-
   // Hash table to track CompactionNode for a given SST File.
   private final ConcurrentHashMap<String, CompactionNode> compactionNodeMap =
       new ConcurrentHashMap<>();
@@ -317,8 +311,10 @@ public class RocksDBCheckpointDiffer {
    * snapshot (RDB checkpoint) is taken.
    * @param sequenceNum RDB sequence number
    */
-  public void appendSequenceNumberToCompactionLog(long sequenceNum) {
-    final String line = COMPACTION_LOG_SEQNUM_LINE_PREFIX + sequenceNum + "\n";
+  public void appendSequenceNumberToCompactionLog(long sequenceNum,
+      String snapshotID) {
+    final String line = COMPACTION_LOG_SEQNUM_LINE_PREFIX + sequenceNum +
+        " " + snapshotID + "\n";
     appendToCurrentCompactionLog(line);
   }
 
@@ -357,6 +353,12 @@ public class RocksDBCheckpointDiffer {
       public void onCompactionBegin(RocksDB db,
           CompactionJobInfo compactionJobInfo) {
 
+        // TODO: Skip (return) if no snapshot has been taken yet
+
+        // Note the current compaction listener implementation does not
+        // differentiate which column family each SST store. It is tracking
+        // all SST files.
+
         synchronized (compactionListenerWriteLock) {
 
           if (compactionJobInfo.inputFiles().size() == 0) {
@@ -374,6 +376,12 @@ public class RocksDBCheckpointDiffer {
             Path srcFile = Paths.get(file);
             try {
               Files.createLink(link, srcFile);
+            } catch (FileAlreadyExistsException ignored) {
+              // This could happen if a previous compaction is a "trivial 
move",
+              // where output SSTs files are exactly the same as input files.
+              // Those SSTs are simply moved to the next level without rewrites
+              // or renames.
+              LOG.debug("SST file already exists: {}", file);
             } catch (IOException e) {
               LOG.error("Exception in creating hard link for {}", file);
               throw new RuntimeException("Failed to create hard link", e);
@@ -391,6 +399,8 @@ public class RocksDBCheckpointDiffer {
       public void onCompactionCompleted(RocksDB db,
           CompactionJobInfo compactionJobInfo) {
 
+        // TODO: Skip (return) if no snapshot has been taken yet
+
         synchronized (compactionListenerWriteLock) {
 
           if (compactionJobInfo.inputFiles().isEmpty()) {
@@ -440,7 +450,9 @@ public class RocksDBCheckpointDiffer {
           appendToCurrentCompactionLog(sb.toString());
 
           // Populate the DAG
-          populateCompactionDAG(inputFiles, outputFiles,
+          // TODO: Once SnapshotChainManager is put into use, set snapshotID to
+          //  snapshotChainManager.getLatestGlobalSnapshot()
+          populateCompactionDAG(inputFiles, outputFiles, null,
               db.getLatestSequenceNumber());
 /*
           if (debugEnabled(DEBUG_DAG_BUILD_UP)) {
@@ -458,6 +470,16 @@ public class RocksDBCheckpointDiffer {
    * @return number of keys
    */
   private long getSSTFileSummary(String filename) throws RocksDBException {
+
+    if (skipGetSSTFileSummary) {
+      // For testing only
+      return 1L;
+    }
+
+    if (!filename.endsWith(SST_FILE_EXTENSION)) {
+      filename += SST_FILE_EXTENSION;
+    }
+
     Options option = new Options();
     SstFileReader reader = new SstFileReader(option);
 
@@ -542,6 +564,8 @@ public class RocksDBCheckpointDiffer {
 
       rocksDB = RocksDB.openReadOnly(dbOptions, dbPathArg,
           cfDescriptors, columnFamilyHandles);
+      // Note it retrieves only the selected column families by the descriptor
+      // i.e. keyTable, directoryTable, fileTable
       List<LiveFileMetaData> liveFileMetaDataList =
           rocksDB.getLiveFilesMetaData();
       LOG.debug("SST File Metadata for DB: " + dbPathArg);
@@ -565,7 +589,7 @@ public class RocksDBCheckpointDiffer {
   /**
    * Process each line of compaction log text file input and populate the DAG.
    */
-  private synchronized void processCompactionLogLine(String line) {
+  synchronized void processCompactionLogLine(String line) {
 
     LOG.debug("Processing line: {}", line);
 
@@ -573,26 +597,34 @@ public class RocksDBCheckpointDiffer {
       // Skip comments
       LOG.debug("Comment line, skipped");
     } else if (line.startsWith(COMPACTION_LOG_SEQNUM_LINE_PREFIX)) {
-      // Read sequence number
-      LOG.debug("Reading sequence number as snapshot generation");
-      final String seqNumStr =
+      // Read sequence number, and snapshot ID
+      LOG.debug("Reading sequence number as snapshot generation, "
+          + "and snapshot ID");
+      final String trimmedStr =
           line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length()).trim();
+      final Scanner input = new Scanner(trimmedStr);
       // This would the snapshot generation for the nodes to come
-      reconstructionSnapshotGeneration = Long.parseLong(seqNumStr);
+      reconstructionSnapshotGeneration = input.nextLong();
+      // This is the snapshotID assigned to every single CompactionNode to come
+      reconstructionLastSnapshotID = input.nextLine().trim();
     } else if (line.startsWith(COMPACTION_LOG_ENTRY_LINE_PREFIX)) {
       // Read compaction log entry
 
       // Trim the beginning
-      line = line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length());
+      line = line.substring(COMPACTION_LOG_ENTRY_LINE_PREFIX.length());
       final String[] io = line.split(":");
       if (io.length != 2) {
-        LOG.error("Invalid line in compaction log: {}", line);
+        if (line.endsWith(":")) {
+          LOG.debug("Ignoring compaction log line for SST deletion");
+        } else {
+          LOG.error("Invalid line in compaction log: {}", line);
+        }
         return;
       }
       final String[] inputFiles = io[0].split(",");
       final String[] outputFiles = io[1].split(",");
-      populateCompactionDAG(asList(inputFiles),
-          asList(outputFiles), reconstructionSnapshotGeneration);
+      populateCompactionDAG(asList(inputFiles), asList(outputFiles),
+          reconstructionLastSnapshotID, reconstructionSnapshotGeneration);
     } else {
       LOG.error("Invalid line in compaction log: {}", line);
     }
@@ -635,47 +667,6 @@ public class RocksDBCheckpointDiffer {
     }
   }
 
-  /**
-   * Snapshot information node class for the differ.
-   */
-  public static class DifferSnapshotInfo {
-    private final String dbPath;
-    private final String snapshotID;
-    private final long snapshotGeneration;
-    private final Map<String, String> tablePrefixes;
-
-    public DifferSnapshotInfo(String db, String id, long gen,
-        Map<String, String> prefixes) {
-      dbPath = db;
-      snapshotID = id;
-      snapshotGeneration = gen;
-      tablePrefixes = prefixes;
-    }
-
-    public String getDbPath() {
-      return dbPath;
-    }
-
-    public String getSnapshotID() {
-      return snapshotID;
-    }
-
-    public long getSnapshotGeneration() {
-      return snapshotGeneration;
-    }
-
-    public Map<String, String> getTablePrefixes() {
-      return tablePrefixes;
-    }
-
-    @Override
-    public String toString() {
-      return "DifferSnapshotInfo{" + "dbPath='" + dbPath + '\''
-          + ", snapshotID='" + snapshotID + '\'' + ", snapshotGeneration="
-          + snapshotGeneration + '}';
-    }
-  }
-
   /**
    * Get a list of SST files that differs between src and destination 
snapshots.
    * <p>
@@ -687,20 +678,23 @@ public class RocksDBCheckpointDiffer {
   public synchronized List<String> getSSTDiffList(
       DifferSnapshotInfo src, DifferSnapshotInfo dest) {
 
-    HashSet<String> srcSnapFiles = readRocksDBLiveFiles(src.dbPath);
-    HashSet<String> destSnapFiles = readRocksDBLiveFiles(dest.dbPath);
+    // TODO: Reject or swap if dest is taken after src, once snapshot chain
+    //  integration is done.
+
+    HashSet<String> srcSnapFiles = readRocksDBLiveFiles(src.getDbPath());
+    HashSet<String> destSnapFiles = readRocksDBLiveFiles(dest.getDbPath());
 
     HashSet<String> fwdDAGSameFiles = new HashSet<>();
     HashSet<String> fwdDAGDifferentFiles = new HashSet<>();
 
     LOG.debug("Doing forward diff from src '{}' to dest '{}'",
-        src.dbPath, dest.dbPath);
+        src.getDbPath(), dest.getDbPath());
     internalGetSSTDiffList(src, dest, srcSnapFiles, destSnapFiles,
         forwardCompactionDAG, fwdDAGSameFiles, fwdDAGDifferentFiles);
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Result of diff from src '" + src.dbPath + "' to dest '" +
-          dest.dbPath + "':");
+      LOG.debug("Result of diff from src '" + src.getDbPath() + "' to dest '" +
+          dest.getDbPath() + "':");
       StringBuilder logSB = new StringBuilder();
 
       logSB.append("Fwd DAG same SST files:      ");
@@ -758,17 +752,27 @@ public class RocksDBCheckpointDiffer {
 
   /**
    * Core getSSTDiffList logic.
+   * <p>
+   * For each SST in the src snapshot, traverse the DAG to find its final
+   * successors.  If any of those successors match an SST in the dest
+   * snapshot, add it to the sameFiles map (as it doesn't need further
+   * diffing).  Otherwise, add it to the differentFiles map, as it will
+   * need further diffing.
    */
-  private void internalGetSSTDiffList(
+  void internalGetSSTDiffList(
       DifferSnapshotInfo src, DifferSnapshotInfo dest,
-      HashSet<String> srcSnapFiles, HashSet<String> destSnapFiles,
+      Set<String> srcSnapFiles, Set<String> destSnapFiles,
       MutableGraph<CompactionNode> mutableGraph,
-      HashSet<String> sameFiles, HashSet<String> differentFiles) {
+      Set<String> sameFiles, Set<String> differentFiles) {
+
+    // Sanity check
+    Preconditions.checkArgument(sameFiles.isEmpty(), "Set must be empty");
+    Preconditions.checkArgument(differentFiles.isEmpty(), "Set must be empty");
 
     for (String fileName : srcSnapFiles) {
       if (destSnapFiles.contains(fileName)) {
         LOG.debug("Source '{}' and destination '{}' share the same SST '{}'",
-            src.dbPath, dest.dbPath, fileName);
+            src.getDbPath(), dest.getDbPath(), fileName);
         sameFiles.add(fileName);
         continue;
       }
@@ -776,7 +780,7 @@ public class RocksDBCheckpointDiffer {
       CompactionNode infileNode = compactionNodeMap.get(fileName);
       if (infileNode == null) {
         LOG.debug("Source '{}' SST file '{}' is never compacted",
-            src.dbPath, fileName);
+            src.getDbPath(), fileName);
         differentFiles.add(fileName);
         continue;
       }
@@ -787,19 +791,33 @@ public class RocksDBCheckpointDiffer {
       // Traversal level/depth indicator for debug print
       int level = 1;
       while (!currentLevel.isEmpty()) {
-        LOG.debug("BFS level: {}. Current level has {} nodes.",
+        LOG.debug("Traversal level: {}. Current level has {} nodes.",
             level++, currentLevel.size());
 
+        if (level >= 1000000) {
+          final String errorMsg = String.format(
+                  "Graph traversal level exceeded allowed maximum (%d). "
+                  + "This could be due to invalid input generating a "
+                  + "loop in the traversal path. Same SSTs found so far: %s, "
+                  + "different SSTs: %s", level, sameFiles, differentFiles);
+          LOG.error(errorMsg);
+          // Clear output in case of error. Expect fall back to full diff
+          sameFiles.clear();
+          differentFiles.clear();
+          // TODO: Revisit error handling here. Use custom exception?
+          throw new RuntimeException(errorMsg);
+        }
+
         final Set<CompactionNode> nextLevel = new HashSet<>();
         for (CompactionNode current : currentLevel) {
-          LOG.debug("Processing node: {}", current.fileName);
-          if (current.snapshotGeneration <= dest.snapshotGeneration) {
+          LOG.debug("Processing node: {}", current.getFileName());
+          if (current.getSnapshotGeneration() < dest.getSnapshotGeneration()) {
             LOG.debug("Current node's snapshot generation '{}' "
                     + "reached destination snapshot's '{}'. "
                     + "Src '{}' and dest '{}' have different SST file: '{}'",
-                current.snapshotGeneration, dest.snapshotGeneration,
-                src.dbPath, dest.dbPath, current.fileName);
-            differentFiles.add(current.fileName);
+                current.getSnapshotGeneration(), dest.getSnapshotGeneration(),
+                src.getDbPath(), dest.getDbPath(), current.getFileName());
+            differentFiles.add(current.getFileName());
             continue;
           }
 
@@ -807,28 +825,28 @@ public class RocksDBCheckpointDiffer {
           if (successors.isEmpty()) {
             LOG.debug("No further compaction happened to the current file. " +
                 "Src '{}' and dest '{}' have different file: {}",
-                src.dbPath, dest.dbPath, current.fileName);
-            differentFiles.add(current.fileName);
+                src.getDbPath(), dest.getDbPath(), current.getFileName());
+            differentFiles.add(current.getFileName());
             continue;
           }
 
           for (CompactionNode node : successors) {
-            if (sameFiles.contains(node.fileName) ||
-                differentFiles.contains(node.fileName)) {
-              LOG.debug("Skipping known processed SST: {}", node.fileName);
+            if (sameFiles.contains(node.getFileName()) ||
+                differentFiles.contains(node.getFileName())) {
+              LOG.debug("Skipping known processed SST: {}", 
node.getFileName());
               continue;
             }
 
-            if (destSnapFiles.contains(node.fileName)) {
+            if (destSnapFiles.contains(node.getFileName())) {
               LOG.debug("Src '{}' and dest '{}' have the same SST: {}",
-                  src.dbPath, dest.dbPath, node.fileName);
-              sameFiles.add(node.fileName);
+                  src.getDbPath(), dest.getDbPath(), node.getFileName());
+              sameFiles.add(node.getFileName());
               continue;
             }
 
             // Queue different SST to the next level
             LOG.debug("Src '{}' and dest '{}' have a different SST: {}",
-                src.dbPath, dest.dbPath, node.fileName);
+                src.getDbPath(), dest.getDbPath(), node.getFileName());
             nextLevel.add(node);
           }
         }
@@ -840,7 +858,7 @@ public class RocksDBCheckpointDiffer {
   static class NodeComparator
       implements Comparator<CompactionNode>, Serializable {
     public int compare(CompactionNode a, CompactionNode b) {
-      return a.fileName.compareToIgnoreCase(b.fileName);
+      return a.getFileName().compareToIgnoreCase(b.getFileName());
     }
 
     @Override
@@ -854,78 +872,11 @@ public class RocksDBCheckpointDiffer {
     List<CompactionNode> nodeList = compactionNodeMap.values().stream()
         .sorted(new NodeComparator()).collect(Collectors.toList());
     for (CompactionNode n : nodeList) {
-      LOG.debug("File '{}' total keys: {}", n.fileName, n.totalNumberOfKeys);
-      LOG.debug("File '{}' cumulative keys: {}", n.fileName,
-          n.cumulativeKeysReverseTraversal);
-    }
-  }
-
-  @VisibleForTesting
-  public synchronized void printMutableGraphFromAGivenNode(String fileName,
-      int sstLevel, MutableGraph<CompactionNode> mutableGraph) {
-
-    CompactionNode infileNode = compactionNodeMap.get(fileName);
-    if (infileNode == null) {
-      return;
-    }
-    LOG.debug("Expanding file: {}. SST compaction level: {}",
-        fileName, sstLevel);
-    Set<CompactionNode> currentLevel = new HashSet<>();
-    currentLevel.add(infileNode);
-    int levelCounter = 1;
-    while (!currentLevel.isEmpty()) {
-      LOG.debug("DAG Level: {}", levelCounter++);
-      final Set<CompactionNode> nextLevel = new HashSet<>();
-      StringBuilder sb = new StringBuilder();
-      for (CompactionNode current : currentLevel) {
-        Set<CompactionNode> successors = mutableGraph.successors(current);
-        for (CompactionNode succNode : successors) {
-          sb.append(succNode.fileName).append(" ");
-          nextLevel.add(succNode);
-        }
-      }
-      LOG.debug("{}", sb);
-      currentLevel = nextLevel;
-    }
-  }
-
-  synchronized void printMutableGraph(String srcSnapId, String destSnapId,
-      MutableGraph<CompactionNode> mutableGraph) {
-
-    LOG.debug("Gathering all SST file nodes from src '{}' to dest '{}'",
-        srcSnapId, destSnapId);
-
-    final Queue<CompactionNode> nodeQueue = new LinkedList<>();
-    // Queue source snapshot SST file nodes
-    for (CompactionNode node : mutableGraph.nodes()) {
-      if (srcSnapId == null ||
-          node.snapshotId.compareToIgnoreCase(srcSnapId) == 0) {
-        nodeQueue.add(node);
-      }
-    }
-
-    final Set<CompactionNode> allNodesSet = new HashSet<>();
-    while (!nodeQueue.isEmpty()) {
-      CompactionNode node = nodeQueue.poll();
-      Set<CompactionNode> succSet = mutableGraph.successors(node);
-      LOG.debug("Current node: {}", node);
-      if (succSet.isEmpty()) {
-        LOG.debug("Has no successor node");
-        allNodesSet.add(node);
-        continue;
-      }
-      for (CompactionNode succNode : succSet) {
-        LOG.debug("Has successor node: {}", succNode);
-        if (srcSnapId == null ||
-            succNode.snapshotId.compareToIgnoreCase(destSnapId) == 0) {
-          allNodesSet.add(succNode);
-          continue;
-        }
-        nodeQueue.add(succNode);
-      }
+      LOG.debug("File '{}' total keys: {}",
+          n.getFileName(), n.getTotalNumberOfKeys());
+      LOG.debug("File '{}' cumulative keys: {}",
+          n.getFileName(), n.getCumulativeKeysReverseTraversal());
     }
-
-    LOG.debug("Files are: {}", allNodesSet);
   }
 
   public MutableGraph<CompactionNode> getForwardCompactionDAG() {
@@ -940,14 +891,16 @@ public class RocksDBCheckpointDiffer {
    * Helper method to add a new file node to the DAG.
    * @return CompactionNode
    */
-  private CompactionNode addNodeToDAG(String file, long seqNum) {
+  private CompactionNode addNodeToDAG(String file, String snapshotID,
+      long seqNum) {
     long numKeys = 0L;
     try {
       numKeys = getSSTFileSummary(file);
     } catch (RocksDBException e) {
       LOG.warn("Can't get num of keys in SST '{}': {}", file, e.getMessage());
     }
-    CompactionNode fileNode = new CompactionNode(file, null, numKeys, seqNum);
+    CompactionNode fileNode = new CompactionNode(
+        file, snapshotID, numKeys, seqNum);
     forwardCompactionDAG.addNode(fileNode);
     backwardCompactionDAG.addNode(fileNode);
 
@@ -956,9 +909,14 @@ public class RocksDBCheckpointDiffer {
 
   /**
    * Populate the compaction DAG with input and output SST files lists.
+   * @param inputFiles List of compaction input files.
+   * @param outputFiles List of compaction output files.
+   * @param snapshotId Snapshot ID for debugging purpose. In fact, this can be
+   *                   arbitrary String as long as it helps debugging.
+   * @param seqNum DB transaction sequence number.
    */
   private void populateCompactionDAG(List<String> inputFiles,
-      List<String> outputFiles, long seqNum) {
+      List<String> outputFiles, String snapshotId, long seqNum) {
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Input files: {} -> Output files: {}", inputFiles, 
outputFiles);
@@ -966,13 +924,13 @@ public class RocksDBCheckpointDiffer {
 
     for (String outfile : outputFiles) {
       final CompactionNode outfileNode = compactionNodeMap.computeIfAbsent(
-          outfile, file -> addNodeToDAG(file, seqNum));
+          outfile, file -> addNodeToDAG(file, snapshotId, seqNum));
 
       for (String infile : inputFiles) {
         final CompactionNode infileNode = compactionNodeMap.computeIfAbsent(
-            infile, file -> addNodeToDAG(file, seqNum));
+            infile, file -> addNodeToDAG(file, snapshotId, seqNum));
         // Draw the edges
-        if (!outfileNode.fileName.equals(infileNode.fileName)) {
+        if (!outfileNode.getFileName().equals(infileNode.getFileName())) {
           forwardCompactionDAG.putEdge(outfileNode, infileNode);
           backwardCompactionDAG.putEdge(infileNode, outfileNode);
         }
@@ -981,65 +939,6 @@ public class RocksDBCheckpointDiffer {
 
   }
 
-  @VisibleForTesting
-  public synchronized void traverseGraph(
-      MutableGraph<CompactionNode> reverseMutableGraph,
-      MutableGraph<CompactionNode> fwdMutableGraph) {
-
-    List<CompactionNode> nodeList = compactionNodeMap.values().stream()
-        .sorted(new NodeComparator()).collect(Collectors.toList());
-
-    for (CompactionNode infileNode : nodeList) {
-      // fist go through fwdGraph to find nodes that don't have successors.
-      // These nodes will be the top level nodes in reverse graph
-      Set<CompactionNode> successors = fwdMutableGraph.successors(infileNode);
-      if (successors.size() == 0) {
-        LOG.debug("No successors. Cumulative keys: {}, total keys: {}",
-            infileNode.cumulativeKeysReverseTraversal,
-            infileNode.totalNumberOfKeys);
-        infileNode.cumulativeKeysReverseTraversal =
-            infileNode.totalNumberOfKeys;
-      }
-    }
-
-    HashSet<CompactionNode> visited = new HashSet<>();
-    for (CompactionNode infileNode : nodeList) {
-      if (visited.contains(infileNode)) {
-        continue;
-      }
-      visited.add(infileNode);
-      LOG.debug("Visiting node '{}'", infileNode.fileName);
-      Set<CompactionNode> currentLevel = new HashSet<>();
-      currentLevel.add(infileNode);
-      int level = 1;
-      while (!currentLevel.isEmpty()) {
-        LOG.debug("BFS Level: {}. Current level has {} nodes",
-            level++, currentLevel.size());
-        final Set<CompactionNode> nextLevel = new HashSet<>();
-        for (CompactionNode current : currentLevel) {
-          LOG.debug("Expanding node: {}", current.fileName);
-          Set<CompactionNode> successors =
-              reverseMutableGraph.successors(current);
-          if (successors.isEmpty()) {
-            LOG.debug("No successors. Cumulative keys: {}",
-                current.cumulativeKeysReverseTraversal);
-            continue;
-          }
-          for (CompactionNode node : successors) {
-            LOG.debug("Adding to the next level: {}", node.fileName);
-            LOG.debug("'{}' cumulative keys: {}. parent '{}' total keys: {}",
-                node.fileName, node.cumulativeKeysReverseTraversal,
-                current.fileName, current.totalNumberOfKeys);
-            node.cumulativeKeysReverseTraversal +=
-                current.cumulativeKeysReverseTraversal;
-            nextLevel.add(node);
-          }
-        }
-        currentLevel = nextLevel;
-      }
-    }
-  }
-
   @VisibleForTesting
   public boolean debugEnabled(Integer level) {
     return DEBUG_LEVEL.contains(level);
@@ -1050,4 +949,9 @@ public class RocksDBCheckpointDiffer {
     return LOG;
   }
 
+  @VisibleForTesting
+  public ConcurrentHashMap<String, CompactionNode> getCompactionNodeMap() {
+    return compactionNodeMap;
+  }
+
 }
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
index aa19220042..590e740575 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
@@ -26,17 +26,30 @@ import static org.junit.jupiter.api.Assertions.fail;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Queue;
 import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
-import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DifferSnapshotInfo;
+import com.google.common.graph.MutableGraph;
+import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.NodeComparator;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.rocksdb.Checkpoint;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
@@ -90,8 +103,167 @@ public class TestRocksDBCheckpointDiffer {
     GenericTestUtils.setLogLevel(TestRocksDBCheckpointDiffer.LOG, Level.INFO);
   }
 
+  /**
+   * Test cases for testGetSSTDiffListWithoutDB.
+   */
+  private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
+
+    DifferSnapshotInfo snapshotInfo1 = new DifferSnapshotInfo(
+        "/path/to/dbcp1", "ssUUID1", 3008L, null);
+    DifferSnapshotInfo snapshotInfo2 = new DifferSnapshotInfo(
+        "/path/to/dbcp2", "ssUUID2", 14980L, null);
+    DifferSnapshotInfo snapshotInfo3 = new DifferSnapshotInfo(
+        "/path/to/dbcp3", "ssUUID3", 17975L, null);
+    DifferSnapshotInfo snapshotInfo4 = new DifferSnapshotInfo(
+        "/path/to/dbcp4", "ssUUID4", 18000L, null);
+
+    Set<String> snapshotSstFiles1 = new HashSet<>(asList(
+        "000059", "000053"));
+    Set<String> snapshotSstFiles2 = new HashSet<>(asList(
+        "000088", "000059", "000053", "000095"));
+    Set<String> snapshotSstFiles3 = new HashSet<>(asList(
+        "000088", "000105", "000059", "000053", "000095"));
+    Set<String> snapshotSstFiles4 = new HashSet<>(asList(
+        "000088", "000105", "000059", "000053", "000095", "000108"));
+    Set<String> snapshotSstFiles1Alt1 = new HashSet<>(asList(
+        "000059", "000053", "000066"));
+    Set<String> snapshotSstFiles1Alt2 = new HashSet<>(asList(
+        "000059", "000053", "000052"));
+    Set<String> snapshotSstFiles2Alt2 = new HashSet<>(asList(
+        "000088", "000059", "000053", "000095", "000099"));
+    Set<String> snapshotSstFiles2Alt3 = new HashSet<>(asList(
+        "000088", "000059", "000053", "000062"));
+
+    return Stream.of(
+        Arguments.of("Test 1: Regular case. Expands expandable " +
+                "SSTs in the initial diff.",
+            snapshotInfo3,
+            snapshotInfo1,
+            snapshotSstFiles3,
+            snapshotSstFiles1,
+            new HashSet<>(asList("000059", "000053")),
+            new HashSet<>(asList(
+                "000066", "000105", "000080", "000087", "000073", "000095")),
+            false),
+        Arguments.of("Test 2: Crafted input: One source " +
+                "('to' snapshot) SST file is never compacted (newly flushed)",
+            snapshotInfo4,
+            snapshotInfo3,
+            snapshotSstFiles4,
+            snapshotSstFiles3,
+            new HashSet<>(asList(
+                "000088", "000105", "000059", "000053", "000095")),
+            new HashSet<>(asList("000108")),
+            false),
+        Arguments.of("Test 3: Crafted input: Same SST files " +
+                "found during SST expansion",
+            snapshotInfo2,
+            snapshotInfo1,
+            snapshotSstFiles2,
+            snapshotSstFiles1Alt1,
+            new HashSet<>(asList("000066", "000059", "000053")),
+            new HashSet<>(asList(
+                "000080", "000087", "000073", "000095")),
+            false),
+        Arguments.of("Test 4: Crafted input: Skipping known " +
+                "processed SST.",
+            snapshotInfo2,
+            snapshotInfo1,
+            snapshotSstFiles2Alt2,
+            snapshotSstFiles1Alt2,
+            new HashSet<>(),
+            new HashSet<>(),
+            true),
+        Arguments.of("Test 5: Hit snapshot generation early exit " +
+                "condition",
+            snapshotInfo2,
+            snapshotInfo1,
+            snapshotSstFiles2Alt3,
+            snapshotSstFiles1,
+            new HashSet<>(asList("000059", "000053")),
+            new HashSet<>(asList(
+                "000066", "000080", "000087", "000073", "000062")),
+            false)
+    );
+  }
+
+  /**
+   * Tests core SST diff list logic. Does not involve DB.
+   * Focuses on testing edge cases in internalGetSSTDiffList().
+   */
+  @ParameterizedTest(name = "{0}")
+  @MethodSource("casesGetSSTDiffListWithoutDB")
+  @SuppressWarnings("parameternumber")
+  public void testGetSSTDiffListWithoutDB(String description,
+      DifferSnapshotInfo srcSnapshot,
+      DifferSnapshotInfo destSnapshot,
+      Set<String> srcSnapshotSstFiles,
+      Set<String> destSnapshotSstFiles,
+      Set<String> expectedSameSstFiles,
+      Set<String> expectedDiffSstFiles,
+      boolean expectingException) {
+
+    RocksDBCheckpointDiffer differ = new RocksDBCheckpointDiffer();
+    boolean exceptionThrown = false;
+
+    String compactionLog = ""
+        + "S 1000 df6410c7-151b-4e90-870e-5ef12875acd5\n"  // Snapshot 0
+        + "C 000001,000002:000062\n"
+          // Additional "compaction" to trigger and test early exit condition
+        + "S 3008 ef6410c7-151b-4e90-870e-5ef12875acd5\n"  // Snapshot 1
+        + "C 000068,000062:000069\n"  // Regular compaction
+        + "C 000071,000064,000060,000052:000071,000064,000060,000052\n"
+          // Trivial move
+        + "C 000073,000066:000074\n"
+        + "C 000082,000076,000069:000083\n"
+        + "C 000087,000080,000074:000088\n"
+        + "C 000093,000090,000083:\n"  // Deletion?
+        + "S 14980 e7ad72f8-52df-4430-93f6-0ee91d4a47fd\n"  // Snapshot 2
+        + "C 000098,000096,000085,000078,000071,000064,000060,000052:000099\n"
+        + "C 000105,000095,000088:000107\n"
+        + "S 17975 4f084f6e-ed3d-4780-8362-f832303309ea\n";  // Snapshot 3
+
+    // Construct DAG from compaction log input
+    Arrays.stream(compactionLog.split("\n")).forEach(
+        differ::processCompactionLogLine);
+
+    Set<String> actualSameSstFiles = new HashSet<>();
+    Set<String> actualDiffSstFiles = new HashSet<>();
+
+    try {
+      differ.internalGetSSTDiffList(
+              srcSnapshot,
+              destSnapshot,
+              srcSnapshotSstFiles,
+              destSnapshotSstFiles,
+              differ.getForwardCompactionDAG(),
+              actualSameSstFiles,
+              actualDiffSstFiles);
+    } catch (RuntimeException rtEx) {
+      if (!expectingException) {
+        fail("Unexpected exception thrown in test.");
+      } else {
+        exceptionThrown = true;
+      }
+    }
+
+    // Check same and different SST files result
+    Assertions.assertEquals(expectedSameSstFiles, actualSameSstFiles);
+    Assertions.assertEquals(expectedDiffSstFiles, actualDiffSstFiles);
+
+    if (expectingException && !exceptionThrown) {
+      fail("Expecting exception but none thrown.");
+    }
+  }
+
+  /**
+   * Tests DB listener (compaction log generation, SST backup),
+   * SST file list diff.
+   * <p>
+   * Does actual DB write, flush, compaction.
+   */
   @Test
-  void testMain() throws Exception {
+  void testDifferWithDB() throws Exception {
 
     final String clDirStr = "compaction-log";
     // Delete the compaction log dir for the test, if it exists
@@ -121,12 +293,21 @@ public class TestRocksDBCheckpointDiffer {
       printAllSnapshots();
     }
 
-    differ.traverseGraph(
+    traverseGraph(differ.getCompactionNodeMap(),
         differ.getBackwardCompactionDAG(),
         differ.getForwardCompactionDAG());
 
     diffAllSnapshots(differ);
 
+    // Confirm correct links created
+    try (Stream<Path> sstPathStream = Files.list(sstDir.toPath())) {
+      List<String> expectedLinks = sstPathStream.map(Path::getFileName)
+              .map(Object::toString).sorted().collect(Collectors.toList());
+      Assertions.assertEquals(expectedLinks, asList(
+              "000015.sst", "000017.sst", "000019.sst", "000021.sst",
+              "000022.sst", "000024.sst", "000026.sst"));
+    }
+
     if (LOG.isDebugEnabled()) {
       differ.dumpCompactionNodeTable();
     }
@@ -215,7 +396,8 @@ public class TestRocksDBCheckpointDiffer {
     this.snapshots.add(currentSnapshot);
 
     // Same as what OmSnapshotManager#createOmSnapshotCheckpoint would do
-    differ.appendSequenceNumberToCompactionLog(dbLatestSequenceNumber);
+    differ.appendSequenceNumberToCompactionLog(
+        dbLatestSequenceNumber, snapshotId);
 
     differ.setCurrentCompactionLog(dbLatestSequenceNumber);
 
@@ -300,158 +482,6 @@ public class TestRocksDBCheckpointDiffer {
     return directoryToBeDeleted.delete();
   }
 
-  /**
-   * RocksDB.DEFAULT_COLUMN_FAMILY.
-   */
-  private void updateRocksDBInstance(String dbPathArg, RocksDB rocksDB) {
-    System.out.println("Updating RocksDB instance at :" + dbPathArg);
-
-    try (Options options = new Options().setCreateIfMissing(true)) {
-      if (rocksDB == null) {
-        rocksDB = RocksDB.open(options, dbPathArg);
-      }
-
-      Random random = new Random();
-      // key-value
-      for (int i = 0; i < NUM_ROW; ++i) {
-        String generatedString = getRandomString(random, 7);
-        String keyStr = " MyUpdated" + generatedString + "StringKey" + i;
-        String valueStr = " My Updated" + generatedString + "StringValue" + i;
-        byte[] key = keyStr.getBytes(UTF_8);
-        rocksDB.put(key, valueStr.getBytes(UTF_8));
-        System.out.println(toStr(rocksDB.get(key)));
-      }
-    } catch (RocksDBException e) {
-      e.printStackTrace();
-    }
-  }
-
-  /**
-   * RocksDB.DEFAULT_COLUMN_FAMILY.
-   */
-  public void testDefaultColumnFamilyOriginal() {
-    System.out.println("testDefaultColumnFamily begin...");
-
-    try (Options options = new Options().setCreateIfMissing(true)) {
-      try (RocksDB rocksDB = RocksDB.open(options, "./rocksdb-data")) {
-        // key-value
-        byte[] key = "Hello".getBytes(UTF_8);
-        rocksDB.put(key, "World".getBytes(UTF_8));
-
-        System.out.println(toStr(rocksDB.get(key)));
-
-        rocksDB.put("SecondKey".getBytes(UTF_8), 
"SecondValue".getBytes(UTF_8));
-
-        // List
-        List<byte[]> keys = asList(key, "SecondKey".getBytes(UTF_8),
-            "missKey".getBytes(UTF_8));
-        List<byte[]> values = rocksDB.multiGetAsList(keys);
-        for (int i = 0; i < keys.size(); i++) {
-          System.out.println("multiGet " + toStr(keys.get(i)) + ":" +
-              (values.get(i) != null ? toStr(values.get(i)) : null));
-        }
-
-        // [key - value]
-        RocksIterator iter = rocksDB.newIterator();
-        for (iter.seekToFirst(); iter.isValid(); iter.next()) {
-          System.out.println("iterator key:" + toStr(iter.key()) + ", " +
-              "iter value:" + toStr(iter.value()));
-        }
-
-        // key
-        rocksDB.delete(key);
-        System.out.println("after remove key:" + toStr(key));
-
-        iter = rocksDB.newIterator();
-        for (iter.seekToFirst(); iter.isValid(); iter.next()) {
-          System.out.println("iterator key:" + toStr(iter.key()) + ", " +
-              "iter value:" + toStr(iter.value()));
-        }
-      }
-    } catch (RocksDBException e) {
-      e.printStackTrace();
-    }
-  }
-
-  // (table)
-  public void testCertainColumnFamily() {
-    System.out.println("\ntestCertainColumnFamily begin...");
-    try (ColumnFamilyOptions cfOpts = new ColumnFamilyOptions()
-        .optimizeUniversalStyleCompaction()) {
-      String cfName = "my-first-columnfamily";
-      // list of column family descriptors, first entry must always be
-      // default column family
-      final List<ColumnFamilyDescriptor> cfDescriptors = asList(
-          new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOpts),
-          new ColumnFamilyDescriptor(cfName.getBytes(UTF_8), cfOpts)
-      );
-
-      List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
-      try (DBOptions dbOptions = new DBOptions()
-          .setCreateIfMissing(true)
-          .setCreateMissingColumnFamilies(true);
-          RocksDB rocksDB = RocksDB.open(dbOptions,
-              "./rocksdb-data-cf/", cfDescriptors, cfHandles)) {
-        ColumnFamilyHandle cfHandle = cfHandles.stream().filter(x -> {
-          try {
-            return (toStr(x.getName())).equals(cfName);
-          } catch (RocksDBException e) {
-            return false;
-          }
-        }).collect(Collectors.toList()).get(0);
-
-        // key/value
-        String key = "FirstKey";
-        rocksDB.put(cfHandles.get(0), key.getBytes(UTF_8),
-            "FirstValue".getBytes(UTF_8));
-        // key
-        byte[] getValue = rocksDB.get(cfHandles.get(0), key.getBytes(UTF_8));
-        LOG.debug("get Value: " + toStr(getValue));
-        // key/value
-        rocksDB.put(cfHandles.get(1), "SecondKey".getBytes(UTF_8),
-            "SecondValue".getBytes(UTF_8));
-
-        List<byte[]> keys = asList(key.getBytes(UTF_8),
-            "SecondKey".getBytes(UTF_8));
-        List<ColumnFamilyHandle> cfHandleList = asList(cfHandle, cfHandle);
-
-        // key
-        List<byte[]> values = rocksDB.multiGetAsList(cfHandleList, keys);
-        for (int i = 0; i < keys.size(); i++) {
-          LOG.debug("multiGet:" + toStr(keys.get(i)) + "--" +
-              (values.get(i) == null ? null : toStr(values.get(i))));
-        }
-
-        List<LiveFileMetaData> liveFileMetaDataList =
-            rocksDB.getLiveFilesMetaData();
-        for (LiveFileMetaData m : liveFileMetaDataList) {
-          System.out.println("Live File Metadata");
-          System.out.println("\tFile :" + m.fileName());
-          System.out.println("\tTable :" + toStr(m.columnFamilyName()));
-          System.out.println("\tKey Range :" + toStr(m.smallestKey()) +
-              " " + "<->" + toStr(m.largestKey()));
-        }
-
-        // key
-        rocksDB.delete(cfHandle, key.getBytes(UTF_8));
-
-        // key
-        RocksIterator iter = rocksDB.newIterator(cfHandle);
-        for (iter.seekToFirst(); iter.isValid(); iter.next()) {
-          System.out.println("Iterator:" + toStr(iter.key()) + ":" +
-              toStr(iter.value()));
-        }
-      } finally {
-        // NOTE frees the column family handles before freeing the db
-        for (final ColumnFamilyHandle cfHandle : cfHandles) {
-          cfHandle.close();
-        }
-      }
-    } catch (RocksDBException e) {
-      e.printStackTrace();
-    } // frees the column family options
-  }
-
   // Read from a given RocksDB instance and optionally write all the
   // keys to a given file.
   void readRocksDBInstance(String dbPathArg, RocksDB rocksDB, FileWriter file,
@@ -478,7 +508,9 @@ public class TestRocksDBCheckpointDiffer {
         LOG.debug("\tKey Range: {}", toStr(m.smallestKey())
             + " <-> " + toStr(m.largestKey()));
         if (differ.debugEnabled(DEBUG_DAG_LIVE_NODES)) {
-          differ.printMutableGraphFromAGivenNode(m.fileName(), m.level(),
+          printMutableGraphFromAGivenNode(
+              differ.getCompactionNodeMap(),
+              m.fileName(), m.level(),
               differ.getForwardCompactionDAG());
         }
       }
@@ -510,4 +542,140 @@ public class TestRocksDBCheckpointDiffer {
   private String toStr(byte[] bytes) {
     return new String(bytes, UTF_8);
   }
+
+  /**
+   * Helper that traverses the graphs for testing.
+   * @param compactionNodeMap
+   * @param reverseMutableGraph
+   * @param fwdMutableGraph
+   */
+  private void traverseGraph(
+      ConcurrentHashMap<String, CompactionNode> compactionNodeMap,
+      MutableGraph<CompactionNode> reverseMutableGraph,
+      MutableGraph<CompactionNode> fwdMutableGraph) {
+
+    List<CompactionNode> nodeList = compactionNodeMap.values().stream()
+        .sorted(new NodeComparator()).collect(Collectors.toList());
+
+    for (CompactionNode infileNode : nodeList) {
+      // fist go through fwdGraph to find nodes that don't have successors.
+      // These nodes will be the top level nodes in reverse graph
+      Set<CompactionNode> successors = fwdMutableGraph.successors(infileNode);
+      if (successors.size() == 0) {
+        LOG.debug("No successors. Cumulative keys: {}, total keys: {}",
+            infileNode.getCumulativeKeysReverseTraversal(),
+            infileNode.getTotalNumberOfKeys());
+        infileNode.setCumulativeKeysReverseTraversal(
+            infileNode.getTotalNumberOfKeys());
+      }
+    }
+
+    Set<CompactionNode> visited = new HashSet<>();
+
+    for (CompactionNode infileNode : nodeList) {
+      if (visited.contains(infileNode)) {
+        continue;
+      }
+      visited.add(infileNode);
+      LOG.debug("Visiting node '{}'", infileNode.getFileName());
+      Set<CompactionNode> currentLevel = new HashSet<>();
+      currentLevel.add(infileNode);
+      int level = 1;
+      while (!currentLevel.isEmpty()) {
+        LOG.debug("BFS Level: {}. Current level has {} nodes",
+            level++, currentLevel.size());
+        final Set<CompactionNode> nextLevel = new HashSet<>();
+        for (CompactionNode current : currentLevel) {
+          LOG.debug("Expanding node: {}", current.getFileName());
+          Set<CompactionNode> successors =
+              reverseMutableGraph.successors(current);
+          if (successors.isEmpty()) {
+            LOG.debug("No successors. Cumulative keys: {}",
+                current.getCumulativeKeysReverseTraversal());
+            continue;
+          }
+          for (CompactionNode node : successors) {
+            LOG.debug("Adding to the next level: {}", node.getFileName());
+            LOG.debug("'{}' cumulative keys: {}. parent '{}' total keys: {}",
+                node.getFileName(), node.getCumulativeKeysReverseTraversal(),
+                current.getFileName(), current.getTotalNumberOfKeys());
+            node.addCumulativeKeysReverseTraversal(
+                current.getCumulativeKeysReverseTraversal());
+            nextLevel.add(node);
+          }
+        }
+        currentLevel = nextLevel;
+      }
+    }
+  }
+
+  private void printMutableGraphFromAGivenNode(
+      ConcurrentHashMap<String, CompactionNode> compactionNodeMap,
+      String fileName,
+      int sstLevel,
+      MutableGraph<CompactionNode> mutableGraph) {
+
+    CompactionNode infileNode = compactionNodeMap.get(fileName);
+    if (infileNode == null) {
+      return;
+    }
+    LOG.debug("Expanding file: {}. SST compaction level: {}",
+        fileName, sstLevel);
+    Set<CompactionNode> currentLevel = new HashSet<>();
+    currentLevel.add(infileNode);
+    int levelCounter = 1;
+    while (!currentLevel.isEmpty()) {
+      LOG.debug("DAG Level: {}", levelCounter++);
+      final Set<CompactionNode> nextLevel = new HashSet<>();
+      StringBuilder sb = new StringBuilder();
+      for (CompactionNode current : currentLevel) {
+        Set<CompactionNode> successors = mutableGraph.successors(current);
+        for (CompactionNode succNode : successors) {
+          sb.append(succNode.getFileName()).append(" ");
+          nextLevel.add(succNode);
+        }
+      }
+      LOG.debug("{}", sb);
+      currentLevel = nextLevel;
+    }
+  }
+
+  private void printMutableGraph(String srcSnapId, String destSnapId,
+      MutableGraph<CompactionNode> mutableGraph) {
+
+    LOG.debug("Gathering all SST file nodes from src '{}' to dest '{}'",
+        srcSnapId, destSnapId);
+
+    final Queue<CompactionNode> nodeQueue = new LinkedList<>();
+    // Queue source snapshot SST file nodes
+    for (CompactionNode node : mutableGraph.nodes()) {
+      if (srcSnapId == null ||
+          node.getSnapshotId().compareToIgnoreCase(srcSnapId) == 0) {
+        nodeQueue.add(node);
+      }
+    }
+
+    final Set<CompactionNode> allNodesSet = new HashSet<>();
+    while (!nodeQueue.isEmpty()) {
+      CompactionNode node = nodeQueue.poll();
+      Set<CompactionNode> succSet = mutableGraph.successors(node);
+      LOG.debug("Current node: {}", node);
+      if (succSet.isEmpty()) {
+        LOG.debug("Has no successor node");
+        allNodesSet.add(node);
+        continue;
+      }
+      for (CompactionNode succNode : succSet) {
+        LOG.debug("Has successor node: {}", succNode);
+        if (srcSnapId == null ||
+            succNode.getSnapshotId().compareToIgnoreCase(destSnapId) == 0) {
+          allNodesSet.add(succNode);
+          continue;
+        }
+        nodeQueue.add(succNode);
+      }
+    }
+
+    LOG.debug("Files are: {}", allNodesSet);
+  }
 }
\ No newline at end of file
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
index de02528e8a..c89f20c8c9 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
 import org.apache.hadoop.hdds.utils.db.RDBStore;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneVolume;
@@ -32,8 +33,8 @@ import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.ozone.rocksdiff.DifferSnapshotInfo;
 import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
-import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DifferSnapshotInfo;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.raftlog.RaftLog;
@@ -49,7 +50,6 @@ import picocli.CommandLine;
 import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -77,7 +77,6 @@ public class TestOMSnapshotDAG {
    * Create a MiniDFSCluster for testing.
    * <p>
    * Ozone is made active by setting OZONE_ENABLED = true
-   *
    */
   @BeforeAll
   public static void init() throws Exception {
@@ -94,7 +93,12 @@ public class TestOMSnapshotDAG {
     raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(3));
     conf.setFromObject(raftClientConfig);
 
-    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build();
+    // Set DB CF write buffer to a much lower value so that flush and 
compaction
+    // happens much more frequently without having to create a lot of keys.
+    conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_CF_WRITE_BUFFER_SIZE,
+        "256KB");
+
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
     cluster.waitForClusterToBeReady();
 
     store = cluster.getClient().getObjectStore();
@@ -160,28 +164,29 @@ public class TestOMSnapshotDAG {
   }
 
   @Test
-  void testZeroSizeKey()
-      throws IOException, InterruptedException, TimeoutException {
+  public void testDAGReconstruction()
+          throws IOException, InterruptedException, TimeoutException {
 
+    // Generate keys
     RandomKeyGenerator randomKeyGenerator =
         new RandomKeyGenerator(cluster.getConf());
     CommandLine cmd = new CommandLine(randomKeyGenerator);
     cmd.execute("--num-of-volumes", "1",
         "--num-of-buckets", "1",
-        "--num-of-keys", "600",
+        "--num-of-keys", "500",
         "--num-of-threads", "1",
-        "--key-size", "0",
+        "--key-size", "0",  // zero size keys. since we don't need to test DNs
         "--factor", "THREE",
         "--type", "RATIS",
         "--validate-writes"
     );
 
-    Assertions.assertEquals(600L, randomKeyGenerator.getNumberOfKeysAdded());
-    Assertions.assertEquals(600L,
+    Assertions.assertEquals(500L, randomKeyGenerator.getNumberOfKeysAdded());
+    Assertions.assertEquals(500L,
         randomKeyGenerator.getSuccessfulValidationCount());
 
     List<OmVolumeArgs> volList = cluster.getOzoneManager()
-        .listAllVolumes("", "", 10);
+        .listAllVolumes("", "", 2);
     LOG.debug("List of all volumes: {}", volList);
     final String volumeName = volList.stream().filter(e ->
         !e.getVolume().equals(OZONE_S3_VOLUME_NAME_DEFAULT))  // Ignore s3v vol
@@ -198,12 +203,12 @@ public class TestOMSnapshotDAG {
     final OzoneVolume volume = store.getVolume(volumeName);
     final OzoneBucket bucket = volume.getBucket(bucketName);
 
-    for (int i = 0; i < 6000; i++) {
+    for (int i = 0; i < 2000; i++) {
       bucket.createKey("b_" + i, 0).close();
     }
 
     // Create another snapshot
-    resp = store.createSnapshot(volumeName, bucketName, "snap3");
+    resp = store.createSnapshot(volumeName, bucketName, "snap2");
     LOG.debug("Snapshot created: {}", resp);
 
     // Get snapshot SST diff list
@@ -214,27 +219,51 @@ public class TestOMSnapshotDAG {
 
     DifferSnapshotInfo snap1 = getDifferSnapshotInfo(omMetadataManager,
         volumeName, bucketName, "snap1");
-    DifferSnapshotInfo snap3 = getDifferSnapshotInfo(omMetadataManager,
-        volumeName, bucketName, "snap3");
+    DifferSnapshotInfo snap2 = getDifferSnapshotInfo(omMetadataManager,
+        volumeName, bucketName, "snap2");
 
     // RocksDB does checkpointing in a separate thread, wait for it
     final File checkpointSnap1 = new File(snap1.getDbPath());
     GenericTestUtils.waitFor(checkpointSnap1::exists, 2000, 20000);
+    final File checkpointSnap2 = new File(snap2.getDbPath());
+    GenericTestUtils.waitFor(checkpointSnap2::exists, 2000, 20000);
+
+    List<String> sstDiffList21 = differ.getSSTDiffList(snap2, snap1);
+    LOG.debug("Got diff list: {}", sstDiffList21);
+
+    // Delete 1000 keys, take a 3rd snapshot, and do another diff
+    for (int i = 0; i < 1000; i++) {
+      bucket.deleteKey("b_" + i);
+    }
+
+    resp = store.createSnapshot(volumeName, bucketName, "snap3");
+    LOG.debug("Snapshot created: {}", resp);
+
+    DifferSnapshotInfo snap3 = getDifferSnapshotInfo(omMetadataManager,
+        volumeName, bucketName, "snap3");
     final File checkpointSnap3 = new File(snap3.getDbPath());
     GenericTestUtils.waitFor(checkpointSnap3::exists, 2000, 20000);
 
-    List<String> actualDiffList = differ.getSSTDiffList(snap3, snap1);
-    LOG.debug("Got diff list: {}", actualDiffList);
-    // Hard-coded expected output.
-    // The result is deterministic. Retrieved from a successful run.
-    final List<String> expectedDiffList = Collections.singletonList("000059");
-    Assertions.assertEquals(expectedDiffList, actualDiffList);
-
-    // TODO: Use smaller DB write buffer size (currently it is set to 128 MB
-    //  in DBProfile), or generate enough keys (in the millions) to trigger
-    //  RDB compaction. Take another snapshot and do the diff again.
-    //  Then restart OM, do the same diff again to see if DAG reconstruction
-    //  works.
+    List<String> sstDiffList32 = differ.getSSTDiffList(snap3, snap2);
+
+    // snap3-snap1 diff result is a combination of snap3-snap2 and snap2-snap1
+    List<String> sstDiffList31 = differ.getSSTDiffList(snap3, snap1);
+
+    // Same snapshot. Result should be empty list
+    List<String> sstDiffList22 = differ.getSSTDiffList(snap2, snap2);
+    Assertions.assertTrue(sstDiffList22.isEmpty());
+
+    // Test DAG reconstruction by restarting OM. Then do the same diffs again
+    cluster.restartOzoneManager();
+
+    List<String> sstDiffList21Run2 = differ.getSSTDiffList(snap2, snap1);
+    Assertions.assertEquals(sstDiffList21, sstDiffList21Run2);
+
+    List<String> sstDiffList32Run2 = differ.getSSTDiffList(snap3, snap2);
+    Assertions.assertEquals(sstDiffList32, sstDiffList32Run2);
+
+    List<String> sstDiffList31Run2 = differ.getSSTDiffList(snap3, snap1);
+    Assertions.assertEquals(sstDiffList31, sstDiffList31Run2);
   }
 
-}
\ No newline at end of file
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index a7fa062909..45bdafee7d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -150,7 +150,8 @@ public final class OmSnapshotManager {
 
     // Write snapshot generation (latest sequence number) to compaction log.
     // This will be used for DAG reconstruction as snapshotGeneration.
-    dbCpDiffer.appendSequenceNumberToCompactionLog(dbLatestSequenceNumber);
+    dbCpDiffer.appendSequenceNumberToCompactionLog(dbLatestSequenceNumber,
+        snapshotInfo.getSnapshotID());
 
     // Set compaction log filename to the latest DB sequence number
     // right after taking the RocksDB checkpoint for Ozone snapshot.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to