This is an automated email from the ASF dual-hosted git repository.

hemant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new cc773e83e8 HDDS-9311. Use compationLogTable to store compaction 
information (#5317)
cc773e83e8 is described below

commit cc773e83e8545c83c36ca583ccec2f8c5d985f70
Author: Hemant Kumar <[email protected]>
AuthorDate: Wed Oct 4 09:20:31 2023 -0700

    HDDS-9311. Use compationLogTable to store compaction information (#5317)
---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   5 +
 .../org/apache/hadoop/hdds/utils/db/RDBStore.java  |  14 +-
 .../ozone/rocksdiff/RocksDBCheckpointDiffer.java   | 596 +++++++---------
 .../rocksdiff/TestRocksDBCheckpointDiffer.java     | 781 ++++++++++++++-------
 .../ozone/om/TestSnapshotBackgroundServices.java   |  97 ++-
 .../apache/hadoop/ozone/om/OmSnapshotManager.java  |  22 -
 6 files changed, 834 insertions(+), 681 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 27855d187d..e802de0666 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -606,4 +606,9 @@ public final class OzoneConsts {
    */
   public static final String SNAPSHOT_INFO_TABLE = "snapshotInfoTable";
 
+  /**
+   * DB compaction log table name. Referenced in RDBStore.
+   */
+  public static final String COMPACTION_LOG_TABLE =
+      "compactionLogTable";
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index 7ccb01d79f..371e6abf5e 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -49,6 +49,7 @@ import org.rocksdb.TransactionLogIterator.BatchResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.ozone.OzoneConsts.COMPACTION_LOG_TABLE;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR;
@@ -150,10 +151,15 @@ public class RDBStore implements DBStore {
         // Set CF handle in differ to be used in DB listener
         rocksDBCheckpointDiffer.setSnapshotInfoTableCFHandle(
             ssInfoTableCF.getHandle());
-        // Finish the initialization of compaction DAG tracker by setting the
-        // sequence number as current compaction log filename.
-        rocksDBCheckpointDiffer.setCurrentCompactionLog(
-            db.getLatestSequenceNumber());
+        // Set CF handle in differ to be store compaction log entry.
+        ColumnFamily compactionLogTableCF =
+            db.getColumnFamily(COMPACTION_LOG_TABLE);
+        Preconditions.checkNotNull(compactionLogTableCF,
+            "CompactionLogTable column family handle should not be null.");
+        rocksDBCheckpointDiffer.setCompactionLogTableCFHandle(
+            compactionLogTableCF.getHandle());
+        // Set activeRocksDB in differ to access compaction log CF.
+        rocksDBCheckpointDiffer.setActiveRocksDB(db.getManagedRocksDb().get());
         // Load all previous compaction logs
         rocksDBCheckpointDiffer.loadAllCompactionLogs();
       }
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index d8fe2d50ab..ebf4473a54 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -30,15 +30,21 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.CompactionLogEntryProto;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.hdds.utils.Scheduler;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import org.apache.ozone.compaction.log.CompactionFileInfo;
+import org.apache.ozone.compaction.log.CompactionLogEntry;
 import org.apache.ozone.rocksdb.util.RdbUtil;
 import org.apache.ozone.graph.PrintableGraph;
 import org.apache.ozone.graph.PrintableGraph.GraphType;
@@ -51,6 +57,7 @@ import org.rocksdb.Options;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.SstFileReader;
+import org.rocksdb.SstFileReaderIterator;
 import org.rocksdb.TableProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,7 +84,6 @@ import java.util.stream.Stream;
 import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.Arrays.asList;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED_DEFAULT;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL;
@@ -116,16 +122,6 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
 
   private final String compactionLogDir;
 
-  /**
-   * Compaction log path for DB compaction history persistence.
-   * This is the source of truth for in-memory SST DAG reconstruction upon
-   * OM restarts.
-   * <p>
-   * Initialized to the latest sequence number on OM startup. The log also 
rolls
-   * over (gets appended to a new file) whenever an Ozone snapshot is taken.
-   */
-  private volatile String currentCompactionLogPath = null;
-
   public static final String COMPACTION_LOG_FILE_NAME_SUFFIX = ".log";
 
   /**
@@ -174,8 +170,8 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
   /**
    * Used during DAG reconstruction.
    */
-  private long reconstructionSnapshotGeneration;
-  private String reconstructionLastSnapshotID;
+  private long reconstructionSnapshotCreationTime;
+  private String reconstructionCompactionReason;
 
   private final Scheduler scheduler;
   private volatile boolean closed;
@@ -188,6 +184,9 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
   private final String dagPruningServiceName = "CompactionDagPruningService";
   private AtomicBoolean suspended;
 
+  private ColumnFamilyHandle compactionLogTableCFHandle;
+  private RocksDB activeRocksDB;
+
   /**
    * This is a package private constructor and should not be used other than
    * testing. Caller should use RocksDBCheckpointDifferHolder#getInstance() to
@@ -303,36 +302,6 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
     }
   }
 
-  /**
-   * Set the current compaction log filename with a given RDB sequence number.
-   * @param latestSequenceNum latest sequence number of RDB.
-   */
-  public void setCurrentCompactionLog(long latestSequenceNum) {
-    String latestSequenceIdStr = String.valueOf(latestSequenceNum);
-
-    if (latestSequenceIdStr.length() < LONG_MAX_STR_LEN) {
-      // Pad zeroes to the left for ordered file listing when sorted
-      // alphabetically.
-      latestSequenceIdStr =
-          StringUtils.leftPad(latestSequenceIdStr, LONG_MAX_STR_LEN, "0");
-    }
-
-    // Local temp variable for storing the new compaction log file path
-    final String newCompactionLog = compactionLogDir + latestSequenceIdStr +
-        COMPACTION_LOG_FILE_NAME_SUFFIX;
-
-    File clFile = new File(newCompactionLog);
-    if (clFile.exists()) {
-      LOG.warn("Compaction log exists: {}. Will append", newCompactionLog);
-    }
-
-    this.currentCompactionLogPath = newCompactionLog;
-
-    // Create empty file if it doesn't exist
-    appendToCurrentCompactionLog("");
-  }
-
-
   @Override
   public void close() throws Exception {
     if (!closed) {
@@ -381,43 +350,6 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
     DEBUG_LEVEL.add(level);
   }
 
-  /**
-   * Append (then flush) to the current compaction log file path.
-   * Note: This does NOT automatically append newline to the log.
-   */
-  private void appendToCurrentCompactionLog(String content) {
-    if (currentCompactionLogPath == null) {
-      LOG.error("Unable to append compaction log. "
-          + "Compaction log path is not set. "
-          + "Please check initialization.");
-      throw new RuntimeException("Compaction log path not set");
-    }
-
-    synchronized (this) {
-      try (BufferedWriter bw = Files.newBufferedWriter(
-          Paths.get(currentCompactionLogPath),
-          StandardOpenOption.CREATE, StandardOpenOption.APPEND)) {
-        bw.write(content);
-        bw.flush();
-      } catch (IOException e) {
-        throw new RuntimeException("Failed to append compaction log to " +
-            currentCompactionLogPath, e);
-      }
-    }
-  }
-
-  /**
-   * Append a sequence number to the compaction log (roughly) when an Ozone
-   * snapshot (RDB checkpoint) is taken.
-   */
-  public void appendSnapshotInfoToCompactionLog(long sequenceNum,
-                                                String snapshotID,
-                                                long creationTime) {
-    final String line = COMPACTION_LOG_SEQ_NUM_LINE_PREFIX + sequenceNum +
-        SPACE_DELIMITER + snapshotID + SPACE_DELIMITER + creationTime + "\n";
-    appendToCurrentCompactionLog(line);
-  }
-
   /**
    * Takes {@link org.rocksdb.Options}.
    */
@@ -457,6 +389,26 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
     this.snapshotInfoTableCFHandle = snapshotInfoTableCFHandle;
   }
 
+  /**
+   * Set CompactionLogTable DB column family handle to access the table.
+   * @param compactionLogTableCFHandle ColumnFamilyHandle
+   */
+  public synchronized void setCompactionLogTableCFHandle(
+      ColumnFamilyHandle compactionLogTableCFHandle) {
+    Preconditions.checkNotNull(compactionLogTableCFHandle,
+        "Column family handle should not be null");
+    this.compactionLogTableCFHandle = compactionLogTableCFHandle;
+  }
+
+  /**
+   * Set activeRocksDB to access CompactionLogTable.
+   * @param activeRocksDB RocksDB
+   */
+  public synchronized void setActiveRocksDB(RocksDB activeRocksDB) {
+    Preconditions.checkNotNull(activeRocksDB, "RocksDB should not be null.");
+    this.activeRocksDB = activeRocksDB;
+  }
+
   /**
    * Helper method to check whether the SnapshotInfoTable column family is 
empty
    * in a given DB instance.
@@ -548,49 +500,25 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
           return;
         }
 
-        final StringBuilder sb = new StringBuilder();
+        long trxId = db.getLatestSequenceNumber();
+
+        CompactionLogEntry.Builder builder;
+        try (ManagedOptions options = new ManagedOptions();
+             ManagedReadOptions readOptions = new ManagedReadOptions()) {
+          builder = new CompactionLogEntry.Builder(trxId,
+              System.currentTimeMillis(),
+              toFileInfoList(compactionJobInfo.inputFiles(), options,
+                  readOptions),
+              toFileInfoList(compactionJobInfo.outputFiles(), options,
+                  readOptions));
+        }
 
         if (LOG.isDebugEnabled()) {
-          // Print compaction reason for this entry in the log file
-          // e.g. kLevelL0FilesNum / kLevelMaxLevelSize.
-          sb.append(COMPACTION_LOG_COMMENT_LINE_PREFIX)
-              .append(compactionJobInfo.compactionReason())
-              .append('\n');
+          builder = builder.setCompactionReason(
+              compactionJobInfo.compactionReason().toString());
         }
 
-        // Mark the beginning of a compaction log
-        sb.append(COMPACTION_LOG_ENTRY_LINE_PREFIX);
-        sb.append(db.getLatestSequenceNumber());
-        sb.append(SPACE_DELIMITER);
-
-        // Trim DB path, only keep the SST file name
-        final int filenameOffset =
-            compactionJobInfo.inputFiles().get(0).lastIndexOf("/") + 1;
-
-        // Append the list of input files
-        final List<String> inputFiles = compactionJobInfo.inputFiles();
-        // Trim the file path, leave only the SST file name without extension
-        inputFiles.replaceAll(s -> s.substring(
-            filenameOffset, s.length() - SST_FILE_EXTENSION_LENGTH));
-        final String inputFilesJoined =
-            String.join(COMPACTION_LOG_ENTRY_FILE_DELIMITER, inputFiles);
-        sb.append(inputFilesJoined);
-
-        // Insert delimiter between input files and output files
-        sb.append(COMPACTION_LOG_ENTRY_INPUT_OUTPUT_FILES_DELIMITER);
-
-        // Append the list of output files
-        final List<String> outputFiles = compactionJobInfo.outputFiles();
-        outputFiles.replaceAll(s -> s.substring(
-            filenameOffset, s.length() - SST_FILE_EXTENSION_LENGTH));
-        final String outputFilesJoined =
-            String.join(COMPACTION_LOG_ENTRY_FILE_DELIMITER, outputFiles);
-        sb.append(outputFilesJoined);
-
-        // End of line
-        sb.append('\n');
-
-        String content = sb.toString();
+        CompactionLogEntry compactionLogEntry = builder.build();
 
         synchronized (this) {
           if (closed) {
@@ -605,19 +533,49 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
 
           waitForTarballCreation();
 
-          // Write input and output file names to compaction log
-          appendToCurrentCompactionLog(content);
+
+          // Add the compaction log entry to Compaction log table.
+          addToCompactionLogTable(compactionLogEntry);
 
           // Populate the DAG
           // TODO: [SNAPSHOT] Once SnapshotChainManager is put into use,
           //  set snapshotID to snapshotChainManager.getLatestGlobalSnapshot()
-          populateCompactionDAG(inputFiles, outputFiles, null,
+          populateCompactionDAG(compactionLogEntry.getInputFileInfoList(),
+              compactionLogEntry.getOutputFileInfoList(), null,
               db.getLatestSequenceNumber());
         }
       }
     };
   }
 
+  @VisibleForTesting
+  void addToCompactionLogTable(CompactionLogEntry compactionLogEntry) {
+    String dbSequenceIdStr =
+        String.valueOf(compactionLogEntry.getDbSequenceNumber());
+
+    if (dbSequenceIdStr.length() < LONG_MAX_STR_LEN) {
+      // Pad zeroes to the left to make sure it is lexicographic ordering.
+      dbSequenceIdStr = org.apache.commons.lang3.StringUtils.leftPad(
+          dbSequenceIdStr, LONG_MAX_STR_LEN, "0");
+    }
+
+    // Key in the transactionId-currentTime
+    // Just trxId can't be used because multiple compaction might be
+    // running, and it is possible no new entry was added to DB.
+    // Adding current time to transactionId eliminates key collision.
+    String keyString = dbSequenceIdStr + "-" +
+        compactionLogEntry.getCompactionTime();
+
+    byte[] key = keyString.getBytes(UTF_8);
+    byte[] value = compactionLogEntry.getProtobuf().toByteArray();
+    try {
+      activeRocksDB.put(compactionLogTableCFHandle, key, value);
+    } catch (RocksDBException exception) {
+      // TODO: Revisit exception handling before merging the PR.
+      throw new RuntimeException(exception);
+    }
+  }
+
   /**
    * Check if there is any in_progress tarball creation request and wait till
    * all tarball creation finish, and it gets notified.
@@ -740,20 +698,20 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
   }
 
   /**
-   * Process each line of compaction log text file input and populate the DAG.
+   * Process log line of compaction log text file input and populate the DAG.
+   * It also adds the compaction log entry to compaction log table.
    */
   void processCompactionLogLine(String line) {
 
     LOG.debug("Processing line: {}", line);
 
     synchronized (this) {
-      if (line.startsWith("#")) {
-        // Skip comments
-        LOG.debug("Comment line, skipped");
+      if (line.startsWith(COMPACTION_LOG_COMMENT_LINE_PREFIX)) {
+        reconstructionCompactionReason =
+            line.substring(COMPACTION_LOG_COMMENT_LINE_PREFIX.length());
       } else if (line.startsWith(COMPACTION_LOG_SEQ_NUM_LINE_PREFIX)) {
-        SnapshotLogInfo snapshotLogInfo = getSnapshotLogInfo(line);
-        reconstructionSnapshotGeneration = 
snapshotLogInfo.snapshotGenerationId;
-        reconstructionLastSnapshotID = snapshotLogInfo.snapshotId;
+        reconstructionSnapshotCreationTime =
+            getSnapshotCreationTimeFromLogLine(line);
       } else if (line.startsWith(COMPACTION_LOG_ENTRY_LINE_PREFIX)) {
         // Compaction log entry is like following:
         // C sequence_number input_files:output_files
@@ -764,6 +722,7 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
           return;
         }
 
+        String dbSequenceNumber = lineSpilt[1];
         String[] io = lineSpilt[2]
             .split(COMPACTION_LOG_ENTRY_INPUT_OUTPUT_FILES_DELIMITER);
 
@@ -778,18 +737,21 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
 
         String[] inputFiles = io[0].split(COMPACTION_LOG_ENTRY_FILE_DELIMITER);
         String[] outputFiles = 
io[1].split(COMPACTION_LOG_ENTRY_FILE_DELIMITER);
-        populateCompactionDAG(asList(inputFiles), asList(outputFiles),
-            reconstructionLastSnapshotID, reconstructionSnapshotGeneration);
+        addFileInfoToCompactionLogTable(Long.parseLong(dbSequenceNumber),
+            reconstructionSnapshotCreationTime, inputFiles, outputFiles,
+            reconstructionCompactionReason);
       } else {
         LOG.error("Invalid line in compaction log: {}", line);
       }
     }
   }
 
+
   /**
-   * Helper to read compaction log to the internal DAG.
+   * Helper to read compaction log file to the internal DAG and compaction log
+   * table.
    */
-  private void readCompactionLogToDAG(String currCompactionLogPath) {
+  private void readCompactionLogFile(String currCompactionLogPath) {
     LOG.debug("Loading compaction log: {}", currCompactionLogPath);
     try (Stream<String> logLineStream =
         Files.lines(Paths.get(currCompactionLogPath), UTF_8)) {
@@ -799,24 +761,20 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
     }
   }
 
-  /**
-   * Load existing compaction log files to the in-memory DAG.
-   * This only needs to be done once during OM startup.
-   */
-  public void loadAllCompactionLogs() {
+  public void addEntriesFromLogFilesToDagAndCompactionLogTable() {
     synchronized (this) {
-      if (compactionLogDir == null) {
-        throw new RuntimeException("Compaction log directory must be set " +
-            "first");
-      }
-      reconstructionSnapshotGeneration = 0L;
+      reconstructionSnapshotCreationTime = 0L;
+      reconstructionCompactionReason = null;
       try {
         try (Stream<Path> pathStream = Files.list(Paths.get(compactionLogDir))
             .filter(e -> e.toString().toLowerCase()
                 .endsWith(COMPACTION_LOG_FILE_NAME_SUFFIX))
             .sorted()) {
           for (Path logPath : pathStream.collect(Collectors.toList())) {
-            readCompactionLogToDAG(logPath.toString());
+            readCompactionLogFile(logPath.toString());
+            // Delete the file once entries are added to compaction table
+            // so that on next restart, only compaction log table is used.
+            Files.delete(logPath);
           }
         }
       } catch (IOException e) {
@@ -826,6 +784,43 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
     }
   }
 
+  /**
+   * Load existing compaction log from table to the in-memory DAG.
+   * This only needs to be done once during OM startup.
+   */
+  public void loadAllCompactionLogs() {
+    synchronized (this) {
+      preconditionChecksForLoadAllCompactionLogs();
+      addEntriesFromLogFilesToDagAndCompactionLogTable();
+      try (ManagedRocksIterator managedRocksIterator = new 
ManagedRocksIterator(
+          activeRocksDB.newIterator(compactionLogTableCFHandle))) {
+        managedRocksIterator.get().seekToFirst();
+        while (managedRocksIterator.get().isValid()) {
+          byte[] value = managedRocksIterator.get().value();
+          CompactionLogEntry compactionLogEntry =
+              CompactionLogEntry.getFromProtobuf(
+                  CompactionLogEntryProto.parseFrom(value));
+          populateCompactionDAG(compactionLogEntry.getInputFileInfoList(),
+              compactionLogEntry.getOutputFileInfoList(),
+              null,
+              compactionLogEntry.getDbSequenceNumber());
+          managedRocksIterator.get().next();
+        }
+      } catch (InvalidProtocolBufferException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private void preconditionChecksForLoadAllCompactionLogs() {
+    Preconditions.checkNotNull(compactionLogDir,
+        "Compaction log directory must be set.");
+    Preconditions.checkNotNull(compactionLogTableCFHandle,
+        "compactionLogTableCFHandle must be set before calling " +
+            "loadAllCompactionLogs.");
+    Preconditions.checkNotNull(activeRocksDB,
+        "activeRocksDB must be set before calling loadAllCompactionLogs.");
+  }
   /**
    * Helper function that prepends SST file name with SST backup directory path
    * (or DB checkpoint path if compaction hasn't happened yet as SST files 
won't
@@ -1096,12 +1091,12 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
   }
 
   @VisibleForTesting
-  MutableGraph<CompactionNode> getForwardCompactionDAG() {
+  public MutableGraph<CompactionNode> getForwardCompactionDAG() {
     return forwardCompactionDAG;
   }
 
   @VisibleForTesting
-  MutableGraph<CompactionNode> getBackwardCompactionDAG() {
+  public MutableGraph<CompactionNode> getBackwardCompactionDAG() {
     return backwardCompactionDAG;
   }
 
@@ -1135,20 +1130,24 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
    *                   arbitrary String as long as it helps debugging.
    * @param seqNum DB transaction sequence number.
    */
-  private void populateCompactionDAG(List<String> inputFiles,
-      List<String> outputFiles, String snapshotId, long seqNum) {
+  private void populateCompactionDAG(List<CompactionFileInfo> inputFiles,
+                                     List<CompactionFileInfo> outputFiles,
+                                     String snapshotId,
+                                     long seqNum) {
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Input files: {} -> Output files: {}", inputFiles, 
outputFiles);
     }
 
-    for (String outfile : outputFiles) {
+    for (CompactionFileInfo outfile : outputFiles) {
       final CompactionNode outfileNode = compactionNodeMap.computeIfAbsent(
-          outfile, file -> addNodeToDAG(file, snapshotId, seqNum));
+          outfile.getFileName(),
+          file -> addNodeToDAG(file, snapshotId, seqNum));
 
-      for (String infile : inputFiles) {
+      for (CompactionFileInfo infile : inputFiles) {
         final CompactionNode infileNode = compactionNodeMap.computeIfAbsent(
-            infile, file -> addNodeToDAG(file, snapshotId, seqNum));
+            infile.getFileName(),
+            file -> addNodeToDAG(file, snapshotId, seqNum));
         // Draw the edges
         if (!outfileNode.getFileName().equals(infileNode.getFileName())) {
           forwardCompactionDAG.putEdge(outfileNode, infileNode);
@@ -1156,7 +1155,30 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
         }
       }
     }
+  }
 
+  private void addFileInfoToCompactionLogTable(
+      long dbSequenceNumber,
+      long creationTime,
+      String[] inputFiles,
+      String[] outputFiles,
+      String compactionReason
+  ) {
+    List<CompactionFileInfo> inputFileInfoList = Arrays.stream(inputFiles)
+        .map(inputFile -> new CompactionFileInfo.Builder(inputFile).build())
+        .collect(Collectors.toList());
+    List<CompactionFileInfo> outputFileInfoList = Arrays.stream(outputFiles)
+        .map(outputFile -> new CompactionFileInfo.Builder(outputFile).build())
+        .collect(Collectors.toList());
+
+    CompactionLogEntry.Builder builder =
+        new CompactionLogEntry.Builder(dbSequenceNumber, creationTime,
+            inputFileInfoList, outputFileInfoList);
+    if (compactionReason != null) {
+      builder.setCompactionReason(compactionReason);
+    }
+
+    addToCompactionLogTable(builder.build());
   }
 
   /**
@@ -1169,11 +1191,10 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
     if (!shouldRun()) {
       return;
     }
-
-    List<Path> olderSnapshotsLogFilePaths =
-        getOlderSnapshotsCompactionLogFilePaths();
-    List<String> lastCompactionSstFiles =
-        getLastCompactionSstFiles(olderSnapshotsLogFilePaths);
+    Pair<Set<String>, List<byte[]>> fileNodeToKeyPair =
+        getOlderFileNodes();
+    Set<String> lastCompactionSstFiles = fileNodeToKeyPair.getLeft();
+    List<byte[]> keysToRemove = fileNodeToKeyPair.getRight();
 
     Set<String> sstFileNodesRemoved =
         pruneSstFileNodesFromDag(lastCompactionSstFiles);
@@ -1185,85 +1206,72 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
 
     try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) {
       removeSstFiles(sstFileNodesRemoved);
-      deleteOlderSnapshotsCompactionFiles(olderSnapshotsLogFilePaths);
+      removeKeyFromCompactionLogTable(keysToRemove);
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
   }
 
-  /**
-   * Deletes the SST files from the backup directory if exists.
-   */
-  private void removeSstFiles(Set<String> sstFileNodes) {
-    for (String sstFileNode: sstFileNodes) {
-      File file =
-          new File(sstBackupDir + "/" + sstFileNode + SST_FILE_EXTENSION);
-      try {
-        Files.deleteIfExists(file.toPath());
-      } catch (IOException exception) {
-        LOG.warn("Failed to delete SST file: " + sstFileNode, exception);
-      }
-    }
-  }
 
   /**
-   * Returns the list of compaction log files which are older than allowed
-   * max time in the compaction DAG.
+   * Returns the list of input files from the compaction entries which are
+   * older than the maximum allowed in the compaction DAG.
    */
-  private List<Path> getOlderSnapshotsCompactionLogFilePaths() {
+  private synchronized Pair<Set<String>, List<byte[]>> getOlderFileNodes() {
     long compactionLogPruneStartTime = System.currentTimeMillis();
+    Set<String> compactionNodes = new HashSet<>();
+    List<byte[]> keysToRemove = new ArrayList<>();
+
+    try (ManagedRocksIterator managedRocksIterator = new ManagedRocksIterator(
+        activeRocksDB.newIterator(compactionLogTableCFHandle))) {
+      managedRocksIterator.get().seekToFirst();
+      while (managedRocksIterator.get().isValid()) {
+        CompactionLogEntry compactionLogEntry = CompactionLogEntry
+            .getFromProtobuf(CompactionLogEntryProto
+                .parseFrom(managedRocksIterator.get().value()));
+
+        if (compactionLogPruneStartTime -
+            compactionLogEntry.getCompactionTime() < maxAllowedTimeInDag) {
+          break;
+        }
 
-    List<Path> compactionFiles =
-        listCompactionLogFileFromCompactionLogDirectory();
-
-    int index = compactionFiles.size() - 1;
-    for (; index >= 0; index--) {
-      Path compactionLogPath = compactionFiles.get(index);
-      SnapshotLogInfo snapshotLogInfo =
-          getSnapshotInfoFromLog(compactionLogPath);
-
-      if (snapshotLogInfo == null) {
-        continue;
-      }
+        compactionLogEntry.getInputFileInfoList()
+            .forEach(inputFileInfo ->
+                compactionNodes.add(inputFileInfo.getFileName()));
+        keysToRemove.add(managedRocksIterator.get().key());
+        managedRocksIterator.get().next();
 
-      if (compactionLogPruneStartTime - snapshotLogInfo.snapshotCreatedAt >
-          maxAllowedTimeInDag) {
-        break;
       }
+    } catch (InvalidProtocolBufferException exception) {
+      // TODO: Handle this properly before merging the PR.
+      throw new RuntimeException(exception);
     }
+    return Pair.of(compactionNodes, keysToRemove);
+  }
 
-    if (index >= 0) {
-      return compactionFiles.subList(0, index + 1);
-    } else {
-      return Collections.emptyList();
+  private synchronized void removeKeyFromCompactionLogTable(
+      List<byte[]> keysToRemove) {
+    try {
+      for (byte[] key: keysToRemove) {
+        activeRocksDB.delete(compactionLogTableCFHandle, key);
+      }
+    } catch (RocksDBException exception) {
+      // TODO Handle exception properly before merging the PR.
+      throw new RuntimeException(exception);
     }
   }
 
   /**
-   * Returns the list of compaction log file path from compaction log 
directory.
+   * Deletes the SST files from the backup directory if exists.
    */
-  private List<Path> listCompactionLogFileFromCompactionLogDirectory() {
-    try (Stream<Path> pathStream = Files.list(Paths.get(compactionLogDir))
-        .filter(e -> e.toString().toLowerCase()
-            .endsWith(COMPACTION_LOG_FILE_NAME_SUFFIX))
-        .sorted()) {
-      return pathStream.collect(Collectors.toList());
-    } catch (IOException e) {
-      throw new RuntimeException("Error listing compaction log dir " +
-          compactionLogDir, e);
-    }
-  }
-
-  public void deleteOlderSnapshotsCompactionFiles(
-      List<Path> olderSnapshotsLogFilePaths) {
-
-    for (int i = 0; i < olderSnapshotsLogFilePaths.size(); i++) {
-      Path olderSnapshotsLogFilePath = olderSnapshotsLogFilePaths.get(i);
+  private void removeSstFiles(Set<String> sstFileNodes) {
+    for (String sstFileNode: sstFileNodes) {
+      File file =
+          new File(sstBackupDir + "/" + sstFileNode + SST_FILE_EXTENSION);
       try {
-        Files.deleteIfExists(olderSnapshotsLogFilePath);
+        Files.deleteIfExists(file.toPath());
       } catch (IOException exception) {
-        LOG.error("Failed to deleted SST file: {}", olderSnapshotsLogFilePath,
-            exception);
+        LOG.warn("Failed to delete SST file: " + sstFileNode, exception);
       }
     }
   }
@@ -1272,7 +1280,7 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
    * Prunes forward and backward DAGs when oldest snapshot with compaction
    * history gets deleted.
    */
-  public Set<String> pruneSstFileNodesFromDag(List<String> sstFileNodes) {
+  public Set<String> pruneSstFileNodesFromDag(Set<String> sstFileNodes) {
     Set<CompactionNode> startNodes = new HashSet<>();
     for (String sstFileNode : sstFileNodes) {
       CompactionNode infileNode = compactionNodeMap.get(sstFileNode);
@@ -1353,29 +1361,7 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
     return removedFiles;
   }
 
-  private SnapshotLogInfo getSnapshotInfoFromLog(Path compactionLogFile) {
-    AtomicReference<SnapshotLogInfo> snapshotLogInfo =
-        new AtomicReference<>();
-    try (Stream<String> logStream = Files.lines(compactionLogFile, UTF_8)) {
-      logStream.forEach(logLine -> {
-        if (!logLine.startsWith(COMPACTION_LOG_SEQ_NUM_LINE_PREFIX)) {
-          return;
-        }
-
-        snapshotLogInfo.set(getSnapshotLogInfo(logLine));
-      });
-    } catch (IOException exception) {
-      throw new RuntimeException("Failed to read compaction log file: " +
-          compactionLogFile, exception);
-    }
-
-    return snapshotLogInfo.get();
-  }
-
-  /**
-   * Converts a snapshot compaction log line to SnapshotLogInfo.
-   */
-  private SnapshotLogInfo getSnapshotLogInfo(String logLine) {
+  private long getSnapshotCreationTimeFromLogLine(String logLine) {
     // Remove `S ` from the line.
     String line =
         logLine.substring(COMPACTION_LOG_SEQ_NUM_LINE_PREFIX.length());
@@ -1384,74 +1370,7 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
     Preconditions.checkArgument(splits.length == 3,
         "Snapshot info log statement has more than expected parameters.");
 
-    return new SnapshotLogInfo(Long.parseLong(splits[0]),
-        splits[1],
-        Long.parseLong(splits[2]));
-  }
-
-  /**
-   * Returns the list of SST files got compacted in the last compaction from
-   * the provided list of compaction log files.
-   * We can't simply use last file from the list because it is possible that
-   * no compaction happened between the last snapshot and previous to that.
-   * Hence, we go over the list in reverse order and return the SST files from
-   * first the compaction happened in the reverse list.
-   * If no compaction happen at all, it returns empty list.
-   */
-  private List<String> getLastCompactionSstFiles(
-      List<Path> compactionLogFiles
-  ) {
-
-    if (compactionLogFiles.isEmpty()) {
-      return Collections.emptyList();
-    }
-    compactionLogFiles = new ArrayList<>(compactionLogFiles);
-    Collections.reverse(compactionLogFiles);
-
-    for (Path compactionLogFile: compactionLogFiles) {
-      List<String> sstFiles = getLastCompactionSstFiles(compactionLogFile);
-      if (!sstFiles.isEmpty()) {
-        return  sstFiles;
-      }
-    }
-
-    return Collections.emptyList();
-  }
-
-  private List<String>  getLastCompactionSstFiles(Path compactionLogFile) {
-
-    AtomicReference<String> sstFiles = new AtomicReference<>();
-
-    try (Stream<String> logStream = Files.lines(compactionLogFile, UTF_8)) {
-      logStream.forEach(logLine -> {
-        if (!logLine.startsWith(COMPACTION_LOG_ENTRY_LINE_PREFIX)) {
-          return;
-        }
-        sstFiles.set(logLine);
-      });
-    } catch (IOException exception) {
-      throw new RuntimeException("Failed to read file: " + compactionLogFile,
-          exception);
-    }
-
-    String lastCompactionLogEntry = sstFiles.get();
-
-    if (StringUtils.isEmpty(lastCompactionLogEntry)) {
-      return Collections.emptyList();
-    }
-
-    // Trim the beginning
-    lastCompactionLogEntry = lastCompactionLogEntry
-        .substring(COMPACTION_LOG_ENTRY_LINE_PREFIX.length());
-
-    String[] io = lastCompactionLogEntry
-        .split(COMPACTION_LOG_ENTRY_INPUT_OUTPUT_FILES_DELIMITER);
-
-    assert (io.length == 2);
-
-    String[] outputFiles = io[1].split(COMPACTION_LOG_ENTRY_FILE_DELIMITER);
-
-    return asList(outputFiles);
+    return Long.parseLong(splits[2]);
   }
 
   public String getSSTBackupDir() {
@@ -1462,20 +1381,6 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
     return compactionLogDir;
   }
 
-  private static final class SnapshotLogInfo {
-    private final long snapshotGenerationId;
-    private final String snapshotId;
-    private final long snapshotCreatedAt;
-
-    private SnapshotLogInfo(long snapshotGenerationId,
-                            String snapshotId,
-                            long snapshotCreatedAt) {
-      this.snapshotGenerationId = snapshotGenerationId;
-      this.snapshotId = snapshotId;
-      this.snapshotCreatedAt = snapshotCreatedAt;
-    }
-  }
-
   /**
    * Defines the task that removes SST files from backup directory which are
    * not needed to generate snapshot diff using compaction DAG to clean
@@ -1537,11 +1442,6 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
     return LOG;
   }
 
-  @VisibleForTesting
-  public String getCurrentCompactionLogPath() {
-    return currentCompactionLogPath;
-  }
-
   @VisibleForTesting
   public ConcurrentHashMap<String, CompactionNode> getCompactionNodeMap() {
     return compactionNodeMap;
@@ -1610,4 +1510,44 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
 
     graph.generateImage(filePath);
   }
+
+  private List<CompactionFileInfo> toFileInfoList(
+      List<String> sstFiles,
+      ManagedOptions options,
+      ManagedReadOptions readOptions
+  ) {
+    if (CollectionUtils.isEmpty(sstFiles)) {
+      return Collections.emptyList();
+    }
+
+    final int fileNameOffset = sstFiles.get(0).lastIndexOf("/") + 1;
+    List<CompactionFileInfo> response = new ArrayList<>();
+
+    for (String sstFile : sstFiles) {
+      String fileName = sstFile.substring(fileNameOffset,
+          sstFile.length() - SST_FILE_EXTENSION_LENGTH);
+      SstFileReader fileReader = new SstFileReader(options);
+      try {
+        fileReader.open(sstFile);
+        String columnFamily = StringUtils.bytes2String(
+            fileReader.getTableProperties().getColumnFamilyName());
+        SstFileReaderIterator iterator = fileReader.newIterator(readOptions);
+        iterator.seekToFirst();
+        String startKey = StringUtils.bytes2String(iterator.key());
+        iterator.seekToLast();
+        String endKey = StringUtils.bytes2String(iterator.key());
+
+        CompactionFileInfo fileInfo = new CompactionFileInfo.Builder(fileName)
+            .setStartRange(startKey)
+            .setEndRange(endKey)
+            .setColumnFamily(columnFamily)
+            .build();
+        response.add(fileInfo);
+      } catch (RocksDBException rocksDBException) {
+        throw new RuntimeException("Failed to read SST file: " + sstFile,
+            rocksDBException);
+      }
+    }
+    return response;
+  }
 }
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
index d7022dde89..18118b142e 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
@@ -21,6 +21,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Arrays.asList;
 import static java.util.concurrent.TimeUnit.MINUTES;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.common.graph.GraphBuilder;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -51,8 +52,12 @@ import com.google.common.graph.MutableGraph;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
 import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
+import org.apache.ozone.compaction.log.CompactionFileInfo;
+import org.apache.ozone.compaction.log.CompactionLogEntry;
 import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.NodeComparator;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.AfterEach;
@@ -82,6 +87,7 @@ import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTI
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED_DEFAULT;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_DAG_DAEMON_RUN_INTERVAL_DEFAULT;
+import static org.apache.hadoop.util.Time.now;
 import static 
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.COMPACTION_LOG_FILE_NAME_SUFFIX;
 import static 
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_DAG_LIVE_NODES;
 import static 
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_READ_ALL_DB_KEYS;
@@ -121,9 +127,13 @@ public class TestRocksDBCheckpointDiffer {
   private File sstBackUpDir;
   private ConfigurationSource config;
   private ExecutorService executorService = Executors.newCachedThreadPool();
+  private RocksDBCheckpointDiffer rocksDBCheckpointDiffer;
+  private RocksDB activeRocksDB;
+  private ColumnFamilyHandle keyTableCFHandle;
+  private ColumnFamilyHandle compactionLogTableCFHandle;
 
   @BeforeEach
-  public void init() {
+  public void init() throws RocksDBException {
     // Checkpoint differ log level. Set to DEBUG for verbose output
     GenericTestUtils.setLogLevel(RocksDBCheckpointDiffer.getLog(), Level.INFO);
     // Test class log level. Set to DEBUG for verbose output
@@ -152,6 +162,30 @@ public class TestRocksDBCheckpointDiffer {
         OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL,
         OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_DAG_DAEMON_RUN_INTERVAL_DEFAULT,
         TimeUnit.MILLISECONDS)).thenReturn(0L);
+
+    rocksDBCheckpointDiffer = new RocksDBCheckpointDiffer(metadataDirName,
+        sstBackUpDirName,
+        compactionLogDirName,
+        activeDbDirName,
+        config);
+
+    ColumnFamilyOptions cfOpts = new ColumnFamilyOptions()
+        .optimizeUniversalStyleCompaction();
+    List<ColumnFamilyDescriptor> cfDescriptors = getCFDescriptorList(cfOpts);
+    List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
+    DBOptions dbOptions = new DBOptions()
+        .setCreateIfMissing(true)
+        .setCreateMissingColumnFamilies(true);
+
+    rocksDBCheckpointDiffer.setRocksDBForCompactionTracking(dbOptions);
+    activeRocksDB = RocksDB.open(dbOptions, activeDbDirName, cfDescriptors,
+        cfHandles);
+    keyTableCFHandle = cfHandles.get(1);
+    compactionLogTableCFHandle = cfHandles.get(4);
+
+    rocksDBCheckpointDiffer.setCompactionLogTableCFHandle(cfHandles.get(4));
+    rocksDBCheckpointDiffer.setActiveRocksDB(activeRocksDB);
+    rocksDBCheckpointDiffer.loadAllCompactionLogs();
   }
 
   private void createDir(File file, String filePath) {
@@ -168,6 +202,10 @@ public class TestRocksDBCheckpointDiffer {
 
   @AfterEach
   public void cleanUp() {
+    IOUtils.closeQuietly(rocksDBCheckpointDiffer);
+    IOUtils.closeQuietly(keyTableCFHandle);
+    IOUtils.closeQuietly(compactionLogTableCFHandle);
+    IOUtils.closeQuietly(activeRocksDB);
     deleteDirectory(compactionLogDir);
     deleteDirectory(sstBackUpDir);
     deleteDirectory(metadataDirDir);
@@ -177,8 +215,77 @@ public class TestRocksDBCheckpointDiffer {
   /**
    * Test cases for testGetSSTDiffListWithoutDB.
    */
+  @SuppressWarnings("methodlength")
   private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
 
+    String compactionLog =
+        // Snapshot 0
+        "S 1000 df6410c7-151b-4e90-870e-5ef12875acd5 " + now() + " \n"
+            // Additional "compaction" to trigger and test early exit condition
+            + "C 1291 000001,000002:000062\n"
+            // Snapshot 1
+            + "S 3008 ef6410c7-151b-4e90-870e-5ef12875acd5 " + now() + " \n"
+            // Regular compaction
+            + "C 4023 000068,000062:000069\n"
+            // Trivial move
+            + "C 5647 
000071,000064,000060,000052:000071,000064,000060,000052\n"
+            + "C 7658 000073,000066:000074\n"
+            + "C 7872 000082,000076,000069:000083\n"
+            + "C 9001 000087,000080,000074:000088\n"
+            // Deletion?
+            + "C 12755 000093,000090,000083:\n"
+            // Snapshot 2
+            + "S 14980 e7ad72f8-52df-4430-93f6-0ee91d4a47fd " + now() + "\n"
+            + "C 16192 000098,000096,000085,000078,000071,000064,000060,000052"
+            + ":000099\n"
+            + "C 16762 000105,000095,000088:000107\n"
+            // Snapshot 3
+            + "S 17975 4f084f6e-ed3d-4780-8362-f832303309ea " + now() + "\n";
+
+    List<CompactionLogEntry> compactionLogEntries = Arrays.asList(
+        // Additional "compaction" to trigger and test early exit condition
+        createCompactionEntry(1291,
+            now(),
+            Arrays.asList("000001", "000002"),
+            Collections.singletonList("000062")),
+        // Regular compaction
+        createCompactionEntry(4023,
+            now(),
+            Arrays.asList("000068", "000062"),
+            Collections.singletonList("000069")),
+        // Trivial move
+        createCompactionEntry(5547,
+            now(),
+            Arrays.asList("000071", "000064", "000060", "000052"),
+            Arrays.asList("000071", "000064", "000060", "000062")),
+        createCompactionEntry(5647,
+            now(),
+            Arrays.asList("000073", "000066"),
+            Collections.singletonList("000074")),
+        createCompactionEntry(7872,
+            now(),
+            Arrays.asList("000082", "000076", "000069"),
+            Collections.singletonList("000083")),
+        createCompactionEntry(9001,
+            now(),
+            Arrays.asList("000087", "000080", "000074"),
+            Collections.singletonList("000088")),
+        // Deletion
+        createCompactionEntry(12755,
+            now(),
+            Arrays.asList("000093", "000090", "000083"),
+            Collections.emptyList()),
+        createCompactionEntry(16192,
+            now(),
+            Arrays.asList("000098", "000096", "000085", "000078", "000071",
+                "000064", "000060", "000052"),
+            Collections.singletonList("000099")),
+        createCompactionEntry(16762,
+            now(),
+            Arrays.asList("000105", "000095", "000088"),
+            Collections.singletonList("000107"))
+    );
+
     DifferSnapshotInfo snapshotInfo1 = new DifferSnapshotInfo(
         "/path/to/dbcp1", UUID.randomUUID(), 3008L, null, null);
     DifferSnapshotInfo snapshotInfo2 = new DifferSnapshotInfo(
@@ -188,72 +295,136 @@ public class TestRocksDBCheckpointDiffer {
     DifferSnapshotInfo snapshotInfo4 = new DifferSnapshotInfo(
         "/path/to/dbcp4", UUID.randomUUID(), 18000L, null, null);
 
-    Set<String> snapshotSstFiles1 = new HashSet<>(asList(
-        "000059", "000053"));
-    Set<String> snapshotSstFiles2 = new HashSet<>(asList(
-        "000088", "000059", "000053", "000095"));
-    Set<String> snapshotSstFiles3 = new HashSet<>(asList(
-        "000088", "000105", "000059", "000053", "000095"));
-    Set<String> snapshotSstFiles4 = new HashSet<>(asList(
-        "000088", "000105", "000059", "000053", "000095", "000108"));
-    Set<String> snapshotSstFiles1Alt1 = new HashSet<>(asList(
-        "000059", "000053", "000066"));
-    Set<String> snapshotSstFiles1Alt2 = new HashSet<>(asList(
-        "000059", "000053", "000052"));
-    Set<String> snapshotSstFiles2Alt2 = new HashSet<>(asList(
-        "000088", "000059", "000053", "000095", "000099"));
-    Set<String> snapshotSstFiles2Alt3 = new HashSet<>(asList(
-        "000088", "000059", "000053", "000062"));
+    Set<String> snapshotSstFiles1 = ImmutableSet.of("000059", "000053");
+    Set<String> snapshotSstFiles2 = ImmutableSet.of("000088", "000059",
+        "000053", "000095");
+    Set<String> snapshotSstFiles3 = ImmutableSet.of("000088", "000105",
+        "000059", "000053", "000095");
+    Set<String> snapshotSstFiles4 = ImmutableSet.of("000088", "000105",
+        "000059", "000053", "000095", "000108");
+    Set<String> snapshotSstFiles1Alt1 = ImmutableSet.of("000059", "000053",
+        "000066");
+    Set<String> snapshotSstFiles1Alt2 = ImmutableSet.of("000059", "000053",
+        "000052");
+    Set<String> snapshotSstFiles2Alt2 = ImmutableSet.of("000088", "000059",
+        "000053", "000095", "000099");
+    Set<String> snapshotSstFiles2Alt3 = ImmutableSet.of("000088", "000059",
+        "000053", "000062");
 
     return Stream.of(
-        Arguments.of("Test 1: Regular case. Expands expandable " +
-                "SSTs in the initial diff.",
+        Arguments.of("Test 1: Compaction log file regular case. " +
+                " Expands expandable SSTs in the initial diff.",
+            compactionLog,
+            null,
+            snapshotInfo3,
+            snapshotInfo1,
+            snapshotSstFiles3,
+            snapshotSstFiles1,
+            ImmutableSet.of("000059", "000053"),
+            ImmutableSet.of("000066", "000105", "000080", "000087", "000073",
+                "000095"),
+            false),
+        Arguments.of("Test 2: Compaction log file crafted input: " +
+                "One source ('to' snapshot) SST file is never compacted " +
+                "(newly flushed)",
+            compactionLog,
+            null,
+            snapshotInfo4,
+            snapshotInfo3,
+            snapshotSstFiles4,
+            snapshotSstFiles3,
+            ImmutableSet.of("000088", "000105", "000059", "000053", "000095"),
+            ImmutableSet.of("000108"),
+            false),
+        Arguments.of("Test 3: Compaction log file crafted input: " +
+                "Same SST files found during SST expansion",
+            compactionLog,
+            null,
+            snapshotInfo2,
+            snapshotInfo1,
+            snapshotSstFiles2,
+            snapshotSstFiles1Alt1,
+            ImmutableSet.of("000066", "000059", "000053"),
+            ImmutableSet.of("000080", "000087", "000073", "000095"),
+            false),
+        Arguments.of("Test 4: Compaction log file crafted input: " +
+                "Skipping known processed SST.",
+            compactionLog,
+            null,
+            snapshotInfo2,
+            snapshotInfo1,
+            snapshotSstFiles2Alt2,
+            snapshotSstFiles1Alt2,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            true),
+        Arguments.of("Test 5: Compaction log file hit snapshot" +
+                " generation early exit condition",
+            compactionLog,
+            null,
+            snapshotInfo2,
+            snapshotInfo1,
+            snapshotSstFiles2Alt3,
+            snapshotSstFiles1,
+            ImmutableSet.of("000059", "000053"),
+            ImmutableSet.of("000066", "000080", "000087", "000073", "000062"),
+            false),
+        Arguments.of("Test 6: Compaction log table regular case. " +
+                "Expands expandable SSTs in the initial diff.",
+            null,
+            compactionLogEntries,
             snapshotInfo3,
             snapshotInfo1,
             snapshotSstFiles3,
             snapshotSstFiles1,
-            new HashSet<>(asList("000059", "000053")),
-            new HashSet<>(asList(
-                "000066", "000105", "000080", "000087", "000073", "000095")),
+            ImmutableSet.of("000059", "000053"),
+            ImmutableSet.of("000066", "000105", "000080", "000087", "000073",
+                "000095"),
             false),
-        Arguments.of("Test 2: Crafted input: One source " +
-                "('to' snapshot) SST file is never compacted (newly flushed)",
+        Arguments.of("Test 7: Compaction log table crafted input: " +
+                "One source ('to' snapshot) SST file is never compacted " +
+                "(newly flushed)",
+            null,
+            compactionLogEntries,
             snapshotInfo4,
             snapshotInfo3,
             snapshotSstFiles4,
             snapshotSstFiles3,
-            new HashSet<>(asList(
-                "000088", "000105", "000059", "000053", "000095")),
-            new HashSet<>(asList("000108")),
+            ImmutableSet.of("000088", "000105", "000059", "000053", "000095"),
+            ImmutableSet.of("000108"),
             false),
-        Arguments.of("Test 3: Crafted input: Same SST files " +
-                "found during SST expansion",
+        Arguments.of("Test 8: Compaction log table crafted input: " +
+                "Same SST files found during SST expansion",
+            null,
+            compactionLogEntries,
             snapshotInfo2,
             snapshotInfo1,
             snapshotSstFiles2,
             snapshotSstFiles1Alt1,
-            new HashSet<>(asList("000066", "000059", "000053")),
-            new HashSet<>(asList(
-                "000080", "000087", "000073", "000095")),
+            ImmutableSet.of("000066", "000059", "000053"),
+            ImmutableSet.of("000080", "000087", "000073", "000095"),
             false),
-        Arguments.of("Test 4: Crafted input: Skipping known " +
-                "processed SST.",
+        Arguments.of("Test 9: Compaction log table crafted input: " +
+                "Skipping known processed SST.",
+            null,
+            compactionLogEntries,
             snapshotInfo2,
             snapshotInfo1,
             snapshotSstFiles2Alt2,
             snapshotSstFiles1Alt2,
-            new HashSet<>(),
-            new HashSet<>(),
+            Collections.emptySet(),
+            Collections.emptySet(),
             true),
-        Arguments.of("Test 5: Hit snapshot generation early exit " +
-                "condition",
+        Arguments.of("Test 10: Compaction log table hit snapshot " +
+                "generation early exit condition",
+            null,
+            compactionLogEntries,
             snapshotInfo2,
             snapshotInfo1,
             snapshotSstFiles2Alt3,
             snapshotSstFiles1,
-            new HashSet<>(asList("000059", "000053")),
-            new HashSet<>(asList(
-                "000066", "000080", "000087", "000073", "000062")),
+            ImmutableSet.of("000059", "000053"),
+            ImmutableSet.of("000066", "000080", "000087", "000073", "000062"),
             false)
     );
   }
@@ -266,6 +437,8 @@ public class TestRocksDBCheckpointDiffer {
   @MethodSource("casesGetSSTDiffListWithoutDB")
   @SuppressWarnings("parameternumber")
   public void testGetSSTDiffListWithoutDB(String description,
+      String compactionLog,
+      List<CompactionLogEntry> compactionLogEntries,
       DifferSnapshotInfo srcSnapshot,
       DifferSnapshotInfo destSnapshot,
       Set<String> srcSnapshotSstFiles,
@@ -274,55 +447,33 @@ public class TestRocksDBCheckpointDiffer {
       Set<String> expectedDiffSstFiles,
       boolean expectingException) {
 
-    RocksDBCheckpointDiffer differ =
-        new RocksDBCheckpointDiffer(metadataDirName,
-            sstBackUpDirName,
-            compactionLogDirName,
-            activeDbDirName,
-            config);
     boolean exceptionThrown = false;
-    long createdTime = System.currentTimeMillis();
 
-    String compactionLog = ""
-        // Snapshot 0
-        + "S 1000 df6410c7-151b-4e90-870e-5ef12875acd5 " + createdTime + " \n"
-        // Additional "compaction" to trigger and test early exit condition
-        + "C 1291 000001,000002:000062\n"
-        // Snapshot 1
-        + "S 3008 ef6410c7-151b-4e90-870e-5ef12875acd5 " + createdTime + " \n"
-        // Regular compaction
-        + "C 4023 000068,000062:000069\n"
-        // Trivial move
-        + "C 5647 000071,000064,000060,000052:000071,000064,000060,000052\n"
-        + "C 7658 000073,000066:000074\n"
-        + "C 7872 000082,000076,000069:000083\n"
-        + "C 9001 000087,000080,000074:000088\n"
-        // Deletion?
-        + "C 12755 000093,000090,000083:\n"
-        // Snapshot 2
-        + "S 14980 e7ad72f8-52df-4430-93f6-0ee91d4a47fd " + createdTime + "\n"
-        + "C 16192 000098,000096,000085,000078,000071,000064,000060,000052"
-        + ":000099\n"
-        + "C 16762 000105,000095,000088:000107\n"
-        // Snapshot 3
-        + "S 17975 4f084f6e-ed3d-4780-8362-f832303309ea " + createdTime + "\n";
-
-    // Construct DAG from compaction log input
-    Arrays.stream(compactionLog.split("\n")).forEach(
-        differ::processCompactionLogLine);
+    if (compactionLog != null) {
+      // Construct DAG from compaction log input
+      Arrays.stream(compactionLog.split("\n")).forEach(
+          rocksDBCheckpointDiffer::processCompactionLogLine);
+    } else if (compactionLogEntries != null) {
+      compactionLogEntries.forEach(entry ->
+          rocksDBCheckpointDiffer.addToCompactionLogTable(entry));
+    } else {
+      throw new IllegalArgumentException("One of compactionLog and " +
+          "compactionLogEntries should be non-null.");
+    }
+    rocksDBCheckpointDiffer.loadAllCompactionLogs();
 
     Set<String> actualSameSstFiles = new HashSet<>();
     Set<String> actualDiffSstFiles = new HashSet<>();
 
     try {
-      differ.internalGetSSTDiffList(
-              srcSnapshot,
-              destSnapshot,
-              srcSnapshotSstFiles,
-              destSnapshotSstFiles,
-              differ.getForwardCompactionDAG(),
-              actualSameSstFiles,
-              actualDiffSstFiles);
+      rocksDBCheckpointDiffer.internalGetSSTDiffList(
+          srcSnapshot,
+          destSnapshot,
+          srcSnapshotSstFiles,
+          destSnapshotSstFiles,
+          rocksDBCheckpointDiffer.getForwardCompactionDAG(),
+          actualSameSstFiles,
+          actualDiffSstFiles);
     } catch (RuntimeException rtEx) {
       if (!expectingException) {
         fail("Unexpected exception thrown in test.");
@@ -348,44 +499,37 @@ public class TestRocksDBCheckpointDiffer {
    */
   @Test
   void testDifferWithDB() throws Exception {
-    RocksDBCheckpointDiffer differ =
-        new RocksDBCheckpointDiffer(metadataDirName,
-            sstBackUpDirName,
-            compactionLogDirName,
-            activeDbDirName,
-            config);
-    RocksDB rocksDB =
-        createRocksDBInstanceAndWriteKeys(activeDbDirName, differ);
-    readRocksDBInstance(activeDbDirName, rocksDB, null, differ);
+    writeKeysAndCheckpointing();
+    readRocksDBInstance(activeDbDirName, activeRocksDB, null,
+        rocksDBCheckpointDiffer);
 
     if (LOG.isDebugEnabled()) {
       printAllSnapshots();
     }
 
-    traverseGraph(differ.getCompactionNodeMap(),
-        differ.getBackwardCompactionDAG(),
-        differ.getForwardCompactionDAG());
+    traverseGraph(rocksDBCheckpointDiffer.getCompactionNodeMap(),
+        rocksDBCheckpointDiffer.getBackwardCompactionDAG(),
+        rocksDBCheckpointDiffer.getForwardCompactionDAG());
 
-    diffAllSnapshots(differ);
+    diffAllSnapshots(rocksDBCheckpointDiffer);
 
     // Confirm correct links created
     try (Stream<Path> sstPathStream = Files.list(sstBackUpDir.toPath())) {
       List<String> expectedLinks = sstPathStream.map(Path::getFileName)
               .map(Object::toString).sorted().collect(Collectors.toList());
       Assertions.assertEquals(expectedLinks, asList(
-              "000015.sst", "000017.sst", "000019.sst", "000021.sst",
-              "000022.sst", "000024.sst", "000026.sst"));
+              "000017.sst", "000019.sst", "000021.sst", "000023.sst",
+          "000024.sst", "000026.sst", "000029.sst"));
     }
 
     if (LOG.isDebugEnabled()) {
-      differ.dumpCompactionNodeTable();
+      rocksDBCheckpointDiffer.dumpCompactionNodeTable();
     }
 
-    rocksDB.close();
-    cleanUp();
+    cleanUpSnapshots();
   }
 
-  public void cleanup() {
+  public void cleanUpSnapshots() {
     for (DifferSnapshotInfo snap : snapshots) {
       snap.getRocksDB().close();
     }
@@ -412,12 +556,12 @@ public class TestRocksDBCheckpointDiffer {
     // Hard-coded expected output.
     // The results are deterministic. Retrieved from a successful run.
     final List<List<String>> expectedDifferResult = asList(
-        asList("000024", "000017", "000028", "000026", "000019", "000021"),
-        asList("000024", "000028", "000026", "000019", "000021"),
-        asList("000024", "000028", "000026", "000021"),
-        asList("000024", "000028", "000026"),
-        asList("000028", "000026"),
-        Collections.singletonList("000028"),
+        asList("000023", "000029", "000026", "000019", "000021", "000031"),
+        asList("000023", "000029", "000026", "000021", "000031"),
+        asList("000023", "000029", "000026", "000031"),
+        asList("000029", "000026", "000031"),
+        asList("000029", "000031"),
+        Collections.singletonList("000031"),
         Collections.emptyList()
     );
     Assertions.assertEquals(snapshots.size(), expectedDifferResult.size());
@@ -437,8 +581,7 @@ public class TestRocksDBCheckpointDiffer {
   /**
    * Helper function that creates an RDB checkpoint (= Ozone snapshot).
    */
-  private void createCheckpoint(RocksDBCheckpointDiffer differ,
-      RocksDB rocksDB) throws RocksDBException {
+  private void createCheckpoint(RocksDB rocksDB) throws RocksDBException {
 
     LOG.trace("Current time: " + System.currentTimeMillis());
     long t1 = System.currentTimeMillis();
@@ -452,8 +595,6 @@ public class TestRocksDBCheckpointDiffer {
       deleteDirectory(dir);
     }
 
-    final long dbLatestSequenceNumber = rocksDB.getLatestSequenceNumber();
-
     createCheckPoint(activeDbDirName, cpPath, rocksDB);
     final UUID snapshotId = UUID.randomUUID();
     List<ColumnFamilyHandle> colHandle = new ArrayList<>();
@@ -464,21 +605,13 @@ public class TestRocksDBCheckpointDiffer {
                 colHandle));
     this.snapshots.add(currentSnapshot);
 
-    // Same as what OmSnapshotManager#createOmSnapshotCheckpoint would do
-    differ.appendSnapshotInfoToCompactionLog(dbLatestSequenceNumber,
-        snapshotId.toString(),
-        System.currentTimeMillis());
-
-    differ.setCurrentCompactionLog(dbLatestSequenceNumber);
-
     long t2 = System.currentTimeMillis();
     LOG.trace("Current time: " + t2);
     LOG.debug("Time elapsed: " + (t2 - t1) + " ms");
   }
 
   // Flushes the WAL and Creates a RocksDB checkpoint
-  void createCheckPoint(String dbPathArg, String cpPathArg,
-      RocksDB rocksDB) {
+  void createCheckPoint(String dbPathArg, String cpPathArg, RocksDB rocksDB) {
     LOG.debug("Creating RocksDB '{}' checkpoint at '{}'", dbPathArg, 
cpPathArg);
     try {
       rocksDB.flush(new FlushOptions());
@@ -506,52 +639,24 @@ public class TestRocksDBCheckpointDiffer {
         new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOpts),
         new ColumnFamilyDescriptor("keyTable".getBytes(UTF_8), cfOpts),
         new ColumnFamilyDescriptor("directoryTable".getBytes(UTF_8), cfOpts),
-        new ColumnFamilyDescriptor("fileTable".getBytes(UTF_8), cfOpts)
+        new ColumnFamilyDescriptor("fileTable".getBytes(UTF_8), cfOpts),
+        new ColumnFamilyDescriptor("compactionLogTable".getBytes(UTF_8), 
cfOpts)
     );
   }
 
-  // Test Code to create sample RocksDB instance.
-  private RocksDB createRocksDBInstanceAndWriteKeys(String dbPathArg,
-      RocksDBCheckpointDiffer differ) throws RocksDBException {
-
-    LOG.debug("Creating RocksDB at '{}'", dbPathArg);
-
-    // Delete the test DB dir if it exists
-    File dir = new File(dbPathArg);
-    if (dir.exists()) {
-      deleteDirectory(dir);
-    }
-
-    final ColumnFamilyOptions cfOpts = new ColumnFamilyOptions()
-        .optimizeUniversalStyleCompaction();
-    final List<ColumnFamilyDescriptor> cfDescriptors =
-        getCFDescriptorList(cfOpts);
-    List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
-
-    // Create a RocksDB instance with compaction tracking
-    final DBOptions dbOptions = new DBOptions()
-        .setCreateIfMissing(true)
-        .setCreateMissingColumnFamilies(true);
-    differ.setRocksDBForCompactionTracking(dbOptions);
-    RocksDB rocksDB = RocksDB.open(dbOptions, dbPathArg, cfDescriptors,
-        cfHandles);
-
-    differ.setCurrentCompactionLog(rocksDB.getLatestSequenceNumber());
-
-    // key-value
+  private void writeKeysAndCheckpointing() throws RocksDBException {
     for (int i = 0; i < NUM_ROW; ++i) {
       String generatedString = RandomStringUtils.randomAlphabetic(7);
       String keyStr = "Key-" + i + "-" + generatedString;
       String valueStr = "Val-" + i + "-" + generatedString;
       byte[] key = keyStr.getBytes(UTF_8);
       // Put entry in keyTable
-      rocksDB.put(cfHandles.get(1), key, valueStr.getBytes(UTF_8));
+      activeRocksDB.put(keyTableCFHandle, key, valueStr.getBytes(UTF_8));
       if (i % SNAPSHOT_EVERY_SO_MANY_KEYS == 0) {
-        createCheckpoint(differ, rocksDB);
+        createCheckpoint(activeRocksDB);
       }
     }
-    createCheckpoint(differ, rocksDB);
-    return rocksDB;
+    createCheckpoint(activeRocksDB);
   }
 
   private boolean deleteDirectory(File directoryToBeDeleted) {
@@ -883,15 +988,8 @@ public class TestRocksDBCheckpointDiffer {
                                    Set<CompactionNode> levelToBeRemoved,
                                    MutableGraph<CompactionNode> expectedDag,
                                    Set<String> expectedFileNodesRemoved) {
-
-    RocksDBCheckpointDiffer differ =
-        new RocksDBCheckpointDiffer(metadataDirName,
-            sstBackUpDirName,
-            compactionLogDirName,
-            activeDbDirName,
-            config);
     Set<String> actualFileNodesRemoved =
-        differ.pruneBackwardDag(originalDag, levelToBeRemoved);
+        rocksDBCheckpointDiffer.pruneBackwardDag(originalDag, 
levelToBeRemoved);
     Assertions.assertEquals(expectedDag, originalDag);
     Assertions.assertEquals(actualFileNodesRemoved, expectedFileNodesRemoved);
   }
@@ -945,56 +1043,46 @@ public class TestRocksDBCheckpointDiffer {
                                   Set<CompactionNode> levelToBeRemoved,
                                   MutableGraph<CompactionNode> expectedDag,
                                   Set<String> expectedFileNodesRemoved) {
-
-    RocksDBCheckpointDiffer differ =
-        new RocksDBCheckpointDiffer(metadataDirName,
-            sstBackUpDirName,
-            compactionLogDirName,
-            activeDbDirName,
-            config);
     Set<String> actualFileNodesRemoved =
-        differ.pruneForwardDag(originalDag, levelToBeRemoved);
+        rocksDBCheckpointDiffer.pruneForwardDag(originalDag, levelToBeRemoved);
     Assertions.assertEquals(expectedDag, originalDag);
     Assertions.assertEquals(actualFileNodesRemoved, expectedFileNodesRemoved);
   }
 
+  @SuppressWarnings("methodlength")
   private static Stream<Arguments> compactionDagPruningScenarios() {
     long currentTimeMillis = System.currentTimeMillis();
 
     String compactionLogFile0 = "S 1000 snapshotId0 " +
         (currentTimeMillis - MINUTES.toMillis(30)) + " \n";
-    String compactionLogFile1 = "C 1000 000015,000013,000011,000009:000018," +
+    String compactionLogFile1 = "C 1500 000015,000013,000011,000009:000018," +
         "000016,000017\n"
         + "S 2000 snapshotId1 " +
         (currentTimeMillis - MINUTES.toMillis(24)) + " \n";
 
-    String compactionLogFile2 = "C 1000 000018,000016,000017,000026,000024," +
+    String compactionLogFile2 = "C 2500 000018,000016,000017,000026,000024," +
         "000022,000020:000027,000030,000028,000031,000029\n"
         + "S 3000 snapshotId2 " +
         (currentTimeMillis - MINUTES.toMillis(18)) + " \n";
 
-    String compactionLogFile3 = "C 1000 000027,000030,000028,000031,000029," +
+    String compactionLogFile3 = "C 3500 000027,000030,000028,000031,000029," +
         "000039,000037,000035,000033:000040,000044,000042,000043,000046," +
         "000041,000045\n"
-        + "S 3000 snapshotId3 " +
+        + "S 4000 snapshotId3 " +
         (currentTimeMillis - MINUTES.toMillis(12)) + " \n";
 
-    String compactionLogFile4 = "C 1000 000040,000044,000042,000043,000046," +
+    String compactionLogFile4 = "C 4500 000040,000044,000042,000043,000046," +
         "000041,000045,000054,000052,000050,000048:000059,000055,000056," +
         "000060,000057,000058\n"
-        + "S 3000 snapshotId4 " +
+        + "S 5000 snapshotId4 " +
         (currentTimeMillis - MINUTES.toMillis(6)) + " \n";
 
-    String compactionLogFileWithoutSnapshot1 = "C 1000 000015,000013,000011," +
+    String compactionLogFileWithoutSnapshot1 = "C 1500 000015,000013,000011," +
         "000009:000018,000016,000017\n" +
         "C 2000 000018,000016,000017,000026,000024,000022,000020" +
         ":000027,000030,000028,000031,000029\n";
 
-    String compactionLogFileWithoutSnapshot2 = "C 3000 000027,000030,000028," +
-        "000031,000029,000039,000037,000035,000033:000040,000044,000042," +
-        "000043,000046,000041,000045\n";
-
-    String compactionLogFileWithoutSnapshot3 = "C 4000 000040,000044,000042," +
+    String compactionLogFileWithoutSnapshot2 = "C 4500 000040,000044,000042," +
         "000043,000046,000041,000045,000054,000052,000050,000048:000059," +
         "000055,000056,000060,000057,000058\n";
 
@@ -1022,28 +1110,18 @@ public class TestRocksDBCheckpointDiffer {
         "S 3000 snapshotIdWithoutCompaction6 " +
             (currentTimeMillis - MINUTES.toMillis(3)) + " \n";
 
-    Set<String> expectedNodes = new HashSet<>(
-        Arrays.asList("000054", "000052", "000050", "000048", "000059",
-            "000055", "000056", "000060", "000057", "000058")
-    );
-
-    Set<String> expectedAllNodes = new HashSet<>(
-        Arrays.asList("000058", "000013", "000035", "000057", "000056",
-            "000011", "000033", "000055", "000018", "000017", "000039",
-            "000016", "000015", "000037", "000059", "000060", "000043",
-            "000020", "000042", "000041", "000040", "000024", "000046",
-            "000045", "000022", "000044", "000029", "000028", "000027",
-            "000026", "000048", "000009", "000050", "000054", "000031",
-            "000030", "000052")
-    );
+    Set<String> expectedNodes = ImmutableSet.of("000059", "000055", "000056",
+        "000060", "000057", "000058");
 
     return Stream.of(
         Arguments.of("Each compaction log file has only one snapshot and one" +
                 " compaction statement except first log file.",
             Arrays.asList(compactionLogFile0, compactionLogFile1,
                 compactionLogFile2, compactionLogFile3, compactionLogFile4),
+            null,
             expectedNodes,
-            4
+            4,
+            0
         ),
         Arguments.of("Compaction log doesn't have snapshot  because OM" +
                 " restarted. Restart happened before snapshot to be deleted.",
@@ -1051,39 +1129,30 @@ public class TestRocksDBCheckpointDiffer {
                 compactionLogFileWithoutSnapshot1,
                 compactionLogFile3,
                 compactionLogFile4),
+            null,
             expectedNodes,
-            3
+            4,
+            0
         ),
         Arguments.of("Compaction log doesn't have snapshot because OM" +
                 " restarted. Restart happened after snapshot to be deleted.",
             Arrays.asList(compactionLogFile0, compactionLogFile1,
                 compactionLogFile2, compactionLogFile3,
-                compactionLogFileWithoutSnapshot3,
+                compactionLogFileWithoutSnapshot2,
                 compactionLogFileOnlyWithSnapshot4),
+            null,
             expectedNodes,
-            4
+            4,
+            0
         ),
         Arguments.of("No compaction happened in between two snapshots.",
             Arrays.asList(compactionLogFile0, compactionLogFile1,
                 compactionLogFile2, compactionLogFile3,
                 compactionLogFileOnlyWithSnapshot1,
                 compactionLogFileOnlyWithSnapshot2, compactionLogFile4),
+            null,
             expectedNodes,
-            6
-        ),
-        Arguments.of("No snapshot is taken and only one compaction log file,",
-            Collections.singletonList(compactionLogFileWithoutSnapshot1 +
-                compactionLogFileWithoutSnapshot2 +
-                compactionLogFileWithoutSnapshot3),
-            expectedAllNodes,
-            0
-        ),
-        Arguments.of("No snapshot is taken but multiple compaction files" +
-                " because of OM restart.",
-            Arrays.asList(compactionLogFileWithoutSnapshot1,
-                compactionLogFileWithoutSnapshot2,
-                compactionLogFileWithoutSnapshot3),
-            expectedAllNodes,
+            4,
             0
         ),
         Arguments.of("Only contains snapshots but no compaction.",
@@ -1093,14 +1162,76 @@ public class TestRocksDBCheckpointDiffer {
                 compactionLogFileOnlyWithSnapshot4,
                 compactionLogFileOnlyWithSnapshot5,
                 compactionLogFileOnlyWithSnapshot6),
+            null,
             Collections.emptySet(),
-            3
+            0,
+            0
         ),
         Arguments.of("No file exists because compaction has not happened" +
                 " and snapshot is not taken.",
             Collections.emptyList(),
+            null,
             Collections.emptySet(),
+            0,
+            0
+        ),
+        Arguments.of("When compaction table is used case 1.",
+            null,
+            asList(createCompactionEntry(1500,
+                    (currentTimeMillis - MINUTES.toMillis(24)),
+                    asList("000015", "000013", "000011", "000009"),
+                    asList("000018", "000016", "000017")),
+                createCompactionEntry(2500,
+                    (currentTimeMillis - MINUTES.toMillis(20)),
+                    asList("000018", "000016", "000017", "000026", "000024",
+                        "000022", "000020"),
+                    asList("000027", "000030", "000028", "000031", "000029")),
+                createCompactionEntry(3500,
+                    (currentTimeMillis - MINUTES.toMillis(16)),
+                    asList("000027", "000030", "000028", "000031", "000029",
+                        "000039", "000037", "000035", "000033"),
+                    asList("000040", "000044", "000042", "000043", "000046",
+                        "000041", "000045")),
+                createCompactionEntry(4500,
+                    (currentTimeMillis - MINUTES.toMillis(12)),
+                    asList("000040", "000044", "000042", "000043", "000046",
+                        "000041", "000045", "000054", "000052", "000050",
+                        "000048"),
+                    asList("000059", "000055", "000056", "000060", "000057",
+                        "000058"))),
+            expectedNodes,
+            4,
             0
+        ),
+        Arguments.of("When compaction table is used case 2.",
+            null,
+            asList(createCompactionEntry(1500,
+                    (currentTimeMillis - MINUTES.toMillis(24)),
+                    asList("000015", "000013", "000011", "000009"),
+                    asList("000018", "000016", "000017")),
+                createCompactionEntry(2500,
+                    (currentTimeMillis - MINUTES.toMillis(18)),
+                    asList("000018", "000016", "000017", "000026", "000024",
+                        "000022", "000020"),
+                    asList("000027", "000030", "000028", "000031", "000029")),
+                createCompactionEntry(3500,
+                    (currentTimeMillis - MINUTES.toMillis(12)),
+                    asList("000027", "000030", "000028", "000031", "000029",
+                        "000039", "000037", "000035", "000033"),
+                    asList("000040", "000044", "000042", "000043", "000046",
+                        "000041", "000045")),
+                createCompactionEntry(4500,
+                    (currentTimeMillis - MINUTES.toMillis(6)),
+                    asList("000040", "000044", "000042", "000043", "000046",
+                        "000041", "000045", "000054", "000052", "000050",
+                        "000048"),
+                    asList("000059", "000055", "000056", "000060", "000057",
+                        "000058"))),
+            ImmutableSet.of("000059", "000055", "000056", "000060", "000057",
+                "000058", "000040", "000044", "000042", "000043", "000046",
+                "000041", "000045", "000054", "000052", "000050", "000048"),
+            4,
+            1
         )
     );
   }
@@ -1113,40 +1244,46 @@ public class TestRocksDBCheckpointDiffer {
   public void testPruneOlderSnapshotsWithCompactionHistory(
       String description,
       List<String> compactionLogs,
+      List<CompactionLogEntry> compactionLogEntries,
       Set<String> expectedNodes,
-      int expectedNumberOfLogFilesDeleted
+      int expectedNumberOfLogEntriesBeforePruning,
+      int expectedNumberOfLogEntriesAfterPruning
   ) throws IOException, ExecutionException, InterruptedException,
       TimeoutException {
     List<File> filesCreated = new ArrayList<>();
 
-    for (int i = 0; i < compactionLogs.size(); i++) {
-      String compactionFileName = metadataDirName + "/" + compactionLogDirName
-          + "/0000" + i + COMPACTION_LOG_FILE_NAME_SUFFIX;
-      File compactionFile = new File(compactionFileName);
-      Files.write(compactionFile.toPath(),
-          compactionLogs.get(i).getBytes(UTF_8));
-      filesCreated.add(compactionFile);
+    if (compactionLogs != null) {
+      for (int i = 0; i < compactionLogs.size(); i++) {
+        String compactionFileName = metadataDirName + "/" + 
compactionLogDirName
+            + "/0000" + i + COMPACTION_LOG_FILE_NAME_SUFFIX;
+        File compactionFile = new File(compactionFileName);
+        Files.write(compactionFile.toPath(),
+            compactionLogs.get(i).getBytes(UTF_8));
+        filesCreated.add(compactionFile);
+      }
+    } else if (compactionLogEntries != null) {
+      compactionLogEntries.forEach(entry ->
+          rocksDBCheckpointDiffer.addToCompactionLogTable(entry));
+    } else {
+      throw new IllegalArgumentException("One of compactionLog or" +
+          " compactionLogEntries should be present.");
     }
 
-    RocksDBCheckpointDiffer differ =
-        new RocksDBCheckpointDiffer(metadataDirName,
-            sstBackUpDirName,
-            compactionLogDirName,
-            activeDbDirName,
-            config);
-
-    differ.loadAllCompactionLogs();
-
-    waitForLock(differ,
+    rocksDBCheckpointDiffer.loadAllCompactionLogs();
+    assertEquals(expectedNumberOfLogEntriesBeforePruning,
+        countEntriesInCompactionLogTable());
+    waitForLock(rocksDBCheckpointDiffer,
         RocksDBCheckpointDiffer::pruneOlderSnapshotsWithCompactionHistory);
 
-    Set<String> actualNodesInForwardDAG = differ.getForwardCompactionDAG()
+    Set<String> actualNodesInForwardDAG = rocksDBCheckpointDiffer
+        .getForwardCompactionDAG()
         .nodes()
         .stream()
         .map(CompactionNode::getFileName)
         .collect(Collectors.toSet());
 
-    Set<String> actualNodesBackwardDAG = differ.getBackwardCompactionDAG()
+    Set<String> actualNodesBackwardDAG = rocksDBCheckpointDiffer
+        .getBackwardCompactionDAG()
         .nodes()
         .stream()
         .map(CompactionNode::getFileName)
@@ -1155,15 +1292,25 @@ public class TestRocksDBCheckpointDiffer {
     assertEquals(expectedNodes, actualNodesInForwardDAG);
     assertEquals(expectedNodes, actualNodesBackwardDAG);
 
-    for (int i = 0; i < expectedNumberOfLogFilesDeleted; i++) {
+    for (int i = 0; compactionLogs != null && i < compactionLogs.size(); i++) {
       File compactionFile = filesCreated.get(i);
       assertFalse(compactionFile.exists());
     }
 
-    for (int i = expectedNumberOfLogFilesDeleted; i < compactionLogs.size();
-         i++) {
-      File compactionFile = filesCreated.get(i);
-      assertTrue(compactionFile.exists());
+    assertEquals(expectedNumberOfLogEntriesAfterPruning,
+        countEntriesInCompactionLogTable());
+  }
+
+  private int countEntriesInCompactionLogTable() {
+    try (ManagedRocksIterator iterator = new ManagedRocksIterator(
+        activeRocksDB.newIterator(compactionLogTableCFHandle))) {
+      iterator.get().seekToFirst();
+      int count = 0;
+      while (iterator.get().isValid()) {
+        iterator.get().next();
+        count++;
+      }
+      return count;
     }
   }
 
@@ -1191,20 +1338,43 @@ public class TestRocksDBCheckpointDiffer {
   }
 
   private static Stream<Arguments> sstFilePruningScenarios() {
+    List<String> initialFiles1 = Arrays.asList("000015", "000013", "000011",
+        "000009");
+    List<String> initialFiles2 = Arrays.asList("000015", "000013", "000011",
+        "000009", "000018", "000016", "000017", "000026", "000024", "000022",
+        "000020");
+    List<String> initialFiles3 = Arrays.asList("000015", "000013", "000011",
+        "000009", "000018", "000016", "000017", "000026", "000024", "000022",
+        "000020", "000027", "000030", "000028", "000031", "000029", "000039",
+        "000037", "000035", "000033", "000040", "000044", "000042", "000043",
+        "000046", "000041", "000045", "000054", "000052", "000050", "000048",
+        "000059", "000055", "000056", "000060", "000057", "000058");
+
+    List<String> expectedFiles1 = Arrays.asList("000015", "000013", "000011",
+        "000009");
+    List<String> expectedFiles2 = Arrays.asList("000015", "000013", "000011",
+        "000009", "000026", "000024", "000022", "000020");
+    List<String> expectedFiles3 = Arrays.asList("000013", "000024", "000035",
+        "000011", "000022", "000033", "000039", "000015", "000026", "000037",
+        "000048", "000009", "000050", "000054", "000020", "000052");
+
     return Stream.of(
-        Arguments.of("Case 1: No compaction.",
+        Arguments.of("Case 1 with compaction log file: " +
+                "No compaction.",
             "",
-            Arrays.asList("000015", "000013", "000011", "000009"),
-            Arrays.asList("000015", "000013", "000011", "000009")
+            null,
+            initialFiles1,
+            expectedFiles1
         ),
-        Arguments.of("Case 2: One level compaction.",
+        Arguments.of("Case 2 with compaction log file: " +
+                "One level compaction.",
             "C 1 000015,000013,000011,000009:000018,000016,000017\n",
-            Arrays.asList("000015", "000013", "000011", "000009", "000018",
-                "000016", "000017", "000026", "000024", "000022", "000020"),
-            Arrays.asList("000015", "000013", "000011", "000009", "000026",
-                "000024", "000022", "000020")
+            null,
+            initialFiles2,
+            expectedFiles2
         ),
-        Arguments.of("Case 3: Multi-level compaction.",
+        Arguments.of("Case 3 with compaction log file: " +
+                "Multi-level compaction.",
             "C 1 000015,000013,000011,000009:000018,000016,000017\n" +
                 "C 2 000018,000016,000017,000026,000024,000022,000020:000027," 
+
                 "000030,000028,000031,000029\n" +
@@ -1213,20 +1383,74 @@ public class TestRocksDBCheckpointDiffer {
                 "C 4 000040,000044,000042,000043,000046,000041,000045,000054," 
+
                 "000052,000050,000048:000059,000055,000056,000060,000057," +
                 "000058\n",
-            Arrays.asList("000015", "000013", "000011", "000009", "000018",
-                "000016", "000017", "000026", "000024", "000022", "000020",
-                "000027", "000030", "000028", "000031", "000029", "000039",
-                "000037", "000035", "000033", "000040", "000044", "000042",
-                "000043", "000046", "000041", "000045", "000054", "000052",
-                "000050", "000048", "000059", "000055", "000056", "000060",
-                "000057", "000058"),
-            Arrays.asList("000013", "000024", "000035", "000011", "000022",
-                "000033", "000039", "000015", "000026", "000037", "000048",
-                "000009", "000050", "000054", "000020", "000052")
+            null,
+            initialFiles3,
+            expectedFiles3
+        ),
+        Arguments.of("Case 4 with compaction log table: " +
+                "No compaction.",
+            null,
+            Collections.emptyList(),
+            initialFiles1,
+            expectedFiles1
+        ),
+        Arguments.of("Case 5 with compaction log table: " +
+                "One level compaction.",
+            null,
+            Collections.singletonList(createCompactionEntry(1,
+                now(),
+                asList("000015", "000013", "000011", "000009"),
+                asList("000018", "000016", "000017"))),
+            initialFiles2,
+            expectedFiles2
+        ),
+        Arguments.of("Case 6 with compaction log table: " +
+                "Multi-level compaction.",
+            null,
+            asList(createCompactionEntry(1,
+                    now(),
+                    asList("000015", "000013", "000011", "000009"),
+                    asList("000018", "000016", "000017")),
+                createCompactionEntry(2,
+                    now(),
+                    asList("000018", "000016", "000017", "000026", "000024",
+                        "000022", "000020"),
+                    asList("000027", "000030", "000028", "000031", "000029")),
+                createCompactionEntry(3,
+                    now(),
+                    asList("000027", "000030", "000028", "000031", "000029",
+                        "000039", "000037", "000035", "000033"),
+                    asList("000040", "000044", "000042", "000043", "000046",
+                        "000041", "000045")),
+                createCompactionEntry(4,
+                    now(),
+                    asList("000040", "000044", "000042", "000043", "000046",
+                        "000041", "000045", "000054", "000052", "000050",
+                        "000048"),
+                    asList("000059", "000055", "000056", "000060", "000057",
+                        "000058"))),
+            initialFiles3,
+            expectedFiles3
         )
     );
   }
 
+  private static CompactionLogEntry createCompactionEntry(
+      long dbSequenceNumber,
+      long compactionTime,
+      List<String> inputFiles,
+      List<String> outputFiles
+  ) {
+    return new CompactionLogEntry.Builder(dbSequenceNumber, compactionTime,
+        toFileInfoList(inputFiles), toFileInfoList(outputFiles)).build();
+  }
+
+  private static List<CompactionFileInfo> toFileInfoList(List<String> files) {
+    return files.stream()
+        .map(fileName -> new CompactionFileInfo.Builder(fileName).build())
+        .collect(Collectors.toList());
+  }
+
   /**
    * End-to-end test for SST file pruning.
    */
@@ -1235,30 +1459,37 @@ public class TestRocksDBCheckpointDiffer {
   public void testSstFilePruning(
       String description,
       String compactionLog,
+      List<CompactionLogEntry> compactionLogEntries,
       List<String> initialFiles,
       List<String> expectedFiles
   ) throws IOException, ExecutionException, InterruptedException,
       TimeoutException {
-    createFileWithContext(metadataDirName + "/" + compactionLogDirName
-            + "/compaction_log" + COMPACTION_LOG_FILE_NAME_SUFFIX,
-        compactionLog);
 
     for (String fileName : initialFiles) {
       createFileWithContext(sstBackUpDir + "/" + fileName + SST_FILE_EXTENSION,
           fileName);
     }
 
-    RocksDBCheckpointDiffer differ =
-        new RocksDBCheckpointDiffer(metadataDirName,
-            sstBackUpDirName,
-            compactionLogDirName,
-            activeDbDirName,
-            config);
-
-    differ.loadAllCompactionLogs();
+    Path compactionLogFilePath = null;
+    if (compactionLog != null) {
+      String compactionLogFileName = metadataDirName + "/" +
+          compactionLogDirName + "/compaction_log" +
+          COMPACTION_LOG_FILE_NAME_SUFFIX;
+      compactionLogFilePath = new File(compactionLogFileName).toPath();
+      createFileWithContext(compactionLogFileName, compactionLog);
+      assertTrue(Files.exists(compactionLogFilePath));
+    } else if (compactionLogEntries != null) {
+      compactionLogEntries.forEach(entry ->
+          rocksDBCheckpointDiffer.addToCompactionLogTable(entry));
+    } else {
+      throw new IllegalArgumentException("One of compactionLog or" +
+          " compactionLogEntries should be present.");
+    }
 
-    waitForLock(differ, RocksDBCheckpointDiffer::pruneSstFiles);
+    rocksDBCheckpointDiffer.loadAllCompactionLogs();
 
+    waitForLock(rocksDBCheckpointDiffer,
+        RocksDBCheckpointDiffer::pruneSstFiles);
 
     Set<String> actualFileSetAfterPruning;
     try (Stream<Path> pathStream = Files.list(
@@ -1275,6 +1506,10 @@ public class TestRocksDBCheckpointDiffer {
 
     Set<String> expectedFileSet = new HashSet<>(expectedFiles);
     assertEquals(expectedFileSet, actualFileSetAfterPruning);
+
+    if (compactionLogFilePath != null) {
+      assertFalse(Files.exists(compactionLogFilePath));
+    }
   }
 
   private void createFileWithContext(String fileName, String context)
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotBackgroundServices.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotBackgroundServices.java
index 10df45b268..8d61fb1a3f 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotBackgroundServices.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotBackgroundServices.java
@@ -44,6 +44,8 @@ import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
 import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone;
 import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse;
+import org.apache.ozone.compaction.log.CompactionLogEntry;
+import org.apache.ozone.rocksdiff.CompactionNode;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.LambdaTestUtils;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -55,12 +57,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
 
-import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -71,6 +69,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static java.util.stream.Collectors.toSet;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL;
@@ -404,36 +403,48 @@ public class TestSnapshotBackgroundServices {
         cluster.getOzoneManager(leaderOM.getOMNodeId());
     Assertions.assertEquals(leaderOM, newFollowerOM);
 
-    // Prepare baseline data for compaction logs
-    String currentCompactionLogPath = newLeaderOM
-        .getMetadataManager()
-        .getStore()
-        .getRocksDBCheckpointDiffer()
-        .getCurrentCompactionLogPath();
-    Assertions.assertNotNull(currentCompactionLogPath);
-    int lastIndex = currentCompactionLogPath.lastIndexOf(OM_KEY_PREFIX);
-    String compactionLogsPath = currentCompactionLogPath
-        .substring(0, lastIndex);
-    File compactionLogsDir = new File(compactionLogsPath);
-    Assertions.assertNotNull(compactionLogsDir);
-    File[] files = compactionLogsDir.listFiles();
-    Assertions.assertNotNull(files);
-    int numberOfLogFiles = files.length;
-    long contentLength;
-    Path currentCompactionLog = Paths.get(currentCompactionLogPath);
-    try (BufferedReader bufferedReader =
-             Files.newBufferedReader(currentCompactionLog)) {
-      contentLength = bufferedReader.lines()
-          .mapToLong(String::length)
-          .reduce(0L, Long::sum);
-    }
+    List<CompactionLogEntry> compactionLogEntriesOnPreviousLeader =
+        getCompactionLogEntries(leaderOM);
+
+    List<CompactionLogEntry> compactionLogEntriesOnNewLeader =
+        getCompactionLogEntries(newLeaderOM);
+    Assertions.assertEquals(compactionLogEntriesOnPreviousLeader,
+        compactionLogEntriesOnNewLeader);
+
+    Assertions.assertEquals(leaderOM.getMetadataManager().getStore()
+            .getRocksDBCheckpointDiffer().getForwardCompactionDAG().nodes()
+            .stream().map(CompactionNode::getFileName).collect(toSet()),
+        newLeaderOM.getMetadataManager().getStore()
+            .getRocksDBCheckpointDiffer().getForwardCompactionDAG().nodes()
+            .stream().map(CompactionNode::getFileName).collect(toSet()));
+    Assertions.assertEquals(leaderOM.getMetadataManager().getStore()
+            .getRocksDBCheckpointDiffer().getForwardCompactionDAG().edges()
+            .stream().map(edge ->
+                edge.source().getFileName() + "-" + 
edge.target().getFileName())
+            .collect(toSet()),
+        newLeaderOM.getMetadataManager().getStore()
+            .getRocksDBCheckpointDiffer().getForwardCompactionDAG().edges()
+            .stream().map(edge ->
+                edge.source().getFileName() + "-" + 
edge.target().getFileName())
+            .collect(toSet()));
 
-    checkIfCompactionLogsGetAppendedByForcingCompaction(newLeaderOM,
-        compactionLogsDir, numberOfLogFiles, contentLength,
-        currentCompactionLog);
+    confirmSnapDiffForTwoSnapshotsDifferingBySingleKey(newLeaderOM);
+  }
 
-    confirmSnapDiffForTwoSnapshotsDifferingBySingleKey(
-        newLeaderOM);
+  private List<CompactionLogEntry> getCompactionLogEntries(OzoneManager om)
+      throws IOException {
+    List<CompactionLogEntry> compactionLogEntries = new ArrayList<>();
+    try (TableIterator<String,
+        ? extends Table.KeyValue<String, CompactionLogEntry>>
+             iterator = om.getMetadataManager().getCompactionLogTable()
+        .iterator()) {
+      iterator.seekToFirst();
+
+      while (iterator.hasNext()) {
+        compactionLogEntries.add(iterator.next().getValue());
+      }
+    }
+    return compactionLogEntries;
   }
 
   @Test
@@ -546,28 +557,6 @@ public class TestSnapshotBackgroundServices {
     }, 1000, 10000);
   }
 
-  private static void checkIfCompactionLogsGetAppendedByForcingCompaction(
-      OzoneManager ozoneManager,
-      File compactionLogsDir, int numberOfLogFiles,
-      long contentLength, Path currentCompactionLog)
-      throws IOException {
-    ozoneManager.getMetadataManager()
-        .getStore()
-        .compactDB();
-    File[] files = compactionLogsDir.listFiles();
-    Assertions.assertNotNull(files);
-    int newNumberOfLogFiles = files.length;
-    long newContentLength;
-    try (BufferedReader bufferedReader =
-             Files.newBufferedReader(currentCompactionLog)) {
-      newContentLength = bufferedReader.lines()
-          .mapToLong(String::length)
-          .reduce(0L, Long::sum);
-    }
-    Assertions.assertTrue(numberOfLogFiles < newNumberOfLogFiles
-        || contentLength < newContentLength);
-  }
-
   private static File getSstBackupDir(OzoneManager ozoneManager) {
     String sstBackupDirPath = ozoneManager
         .getMetadataManager()
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index 1bcaefccfc..4ec14e1596 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -462,28 +462,6 @@ public final class OmSnapshotManager implements 
AutoCloseable {
           dbCheckpoint.getCheckpointLocation(), snapshotInfo.getName());
     }
 
-    final RocksDBCheckpointDiffer dbCpDiffer =
-        store.getRocksDBCheckpointDiffer();
-
-    if (dbCpDiffer != null) {
-      final long dbLatestSequenceNumber = snapshotInfo.getDbTxSequenceNumber();
-
-      Objects.requireNonNull(snapshotInfo.getSnapshotId(),
-          "SnapshotId is null for snapshot: " + snapshotInfo.getName());
-      // Write snapshot generation (latest sequence number) to compaction log.
-      // This will be used for DAG reconstruction as snapshotGeneration.
-      dbCpDiffer.appendSnapshotInfoToCompactionLog(dbLatestSequenceNumber,
-          snapshotInfo.getSnapshotId().toString(),
-          snapshotInfo.getCreationTime());
-
-      // Set compaction log filename to the latest DB sequence number
-      // right after taking the RocksDB checkpoint for Ozone snapshot.
-      //
-      // Note it doesn't matter if sequence number hasn't increased (even 
though
-      // it shouldn't happen), since the writer always appends the file.
-      dbCpDiffer.setCurrentCompactionLog(dbLatestSequenceNumber);
-    }
-
     return dbCheckpoint;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to