This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch HDDS-6517-Snapshot
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-6517-Snapshot by this 
push:
     new 9fe6d106ac HDDS-7524. Compaction DAG node pruning (#4045)
9fe6d106ac is described below

commit 9fe6d106acc71fa79402880ba6344fcab131185d
Author: Hemant Kumar <[email protected]>
AuthorDate: Tue Jan 3 01:45:47 2023 -0800

    HDDS-7524. Compaction DAG node pruning (#4045)
---
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |  17 +
 .../common/src/main/resources/ozone-default.xml    |  19 +
 .../hadoop/hdds/utils/db/DBStoreBuilder.java       |  20 +-
 .../org/apache/hadoop/hdds/utils/db/RDBStore.java  |  11 +-
 hadoop-hdds/rocksdb-checkpoint-differ/pom.xml      |   1 +
 .../ozone/rocksdiff/RocksDBCheckpointDiffer.java   | 493 +++++++++++++++----
 .../rocksdiff/TestRocksDBCheckpointDiffer.java     | 534 ++++++++++++++++++---
 .../apache/hadoop/ozone/om/OmSnapshotManager.java  |   5 +-
 8 files changed, 957 insertions(+), 143 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 74ed487e8c..8011e35e31 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -534,6 +534,23 @@ public final class OzoneConfigKeys {
   public static final String OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE =
       "ozone.om.snapshot.cache.max.size";
   public static final int OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE_DEFAULT = 10;
+
+  public static final String
+      OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED =
+      "ozone.om.snapshot.compaction.dag.max.time.allowed";
+
+  public static final long
+      OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED_DEFAULT =
+      TimeUnit.DAYS.toMillis(30);
+
+  public static final String
+      OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL =
+      "ozone.om.snapshot.compaction.dag.prune.daemon.run.interval";
+
+  public static final long
+      OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_DAG_DAEMON_RUN_INTERVAL_DEFAULT =
+      TimeUnit.HOURS.toMillis(1);
+
   /**
    * There is no need to instantiate this class.
    */
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 3e8499bf71..64977ecd32 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3407,4 +3407,23 @@
     </description>
   </property>
 
+  <property>
+    <name>ozone.om.snapshot.compaction.dag.max.time.allowed</name>
+    <value>30d</value>
+    <tag>OZONE, OM</tag>
+    <description>
+      Maximum time a snapshot is allowed to be in compaction DAG before it 
gets pruned out by pruning daemon.
+      Uses millisecond by default when no time unit is specified.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.om.snapshot.compaction.dag.prune.daemon.run.interval</name>
+    <value>3600s</value>
+    <tag>OZONE, OM</tag>
+    <description>
+      Interval at which compaction DAG pruning daemon thread is running to 
remove older snapshots with compaction
+      history from compaction DAG. Uses millisecond by default when no time 
unit is specified.
+    </description>
+  </property>
 </configuration>
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
index 5aad1aa526..c3b0151cb9 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -44,6 +45,10 @@ import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKS
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS_DEFAULT;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS_OFF;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_DAG_DAEMON_RUN_INTERVAL_DEFAULT;
 import static org.rocksdb.RocksDB.DEFAULT_COLUMN_FAMILY;
 
 import org.apache.hadoop.hdds.conf.StorageUnit;
@@ -98,6 +103,8 @@ public final class DBStoreBuilder {
   private int maxFSSnapshots = 0;
   private final DBProfile defaultCfProfile;
   private boolean enableCompactionLog;
+  private long maxTimeAllowedForSnapshotInDag;
+  private long pruneCompactionDagDaemonRunInterval;
 
   /**
    * Create DBStoreBuilder from a generic DBDefinition.
@@ -145,6 +152,16 @@ public final class DBStoreBuilder {
     defaultCfProfile = this.configuration.getEnum(HDDS_DB_PROFILE,
           HDDS_DEFAULT_DB_PROFILE);
     LOG.debug("Default DB profile:{}", defaultCfProfile);
+
+    maxTimeAllowedForSnapshotInDag = configuration.getTimeDuration(
+        OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED,
+        OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    pruneCompactionDagDaemonRunInterval = configuration.getTimeDuration(
+        OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL,
+        OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_DAG_DAEMON_RUN_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
   }
 
   private void applyDBDefinition(DBDefinition definition) {
@@ -201,7 +218,8 @@ public final class DBStoreBuilder {
 
       return new RDBStore(dbFile, rocksDBOption, writeOptions, tableConfigs,
           registry, openReadOnly, maxFSSnapshots, dbJmxBeanNameName,
-          enableCompactionLog);
+          enableCompactionLog, maxTimeAllowedForSnapshotInDag,
+          pruneCompactionDagDaemonRunInterval);
     } finally {
       tableConfigs.forEach(TableConfig::close);
     }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index adafe079a8..2535ff6619 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.utils.RocksDBStoreMBean;
 import org.apache.hadoop.hdds.utils.db.cache.TableCache;
@@ -80,14 +81,17 @@ public class RDBStore implements DBStore {
   public RDBStore(File dbFile, ManagedDBOptions options,
                   Set<TableConfig> families) throws IOException {
     this(dbFile, options, new ManagedWriteOptions(), families,
-        new CodecRegistry(), false, 1000, null, false);
+        new CodecRegistry(), false, 1000, null, false,
+        TimeUnit.DAYS.toMillis(1), TimeUnit.HOURS.toMillis(1));
   }
 
   @SuppressWarnings("parameternumber")
   public RDBStore(File dbFile, ManagedDBOptions dbOptions,
                   ManagedWriteOptions writeOptions, Set<TableConfig> families,
                   CodecRegistry registry, boolean readOnly, int maxFSSnapshots,
-                  String dbJmxBeanNameName, boolean enableCompactionLog)
+                  String dbJmxBeanNameName, boolean enableCompactionLog,
+                  long maxTimeAllowedForSnapshotInDag,
+                  long compactionDagDaemonInterval)
       throws IOException {
     Preconditions.checkNotNull(dbFile, "DB file location cannot be null");
     Preconditions.checkNotNull(families);
@@ -101,7 +105,8 @@ public class RDBStore implements DBStore {
       if (enableCompactionLog) {
         rocksDBCheckpointDiffer = new RocksDBCheckpointDiffer(
             dbLocation.getParent(), dbCompactionSSTBackupDirName,
-            dbCompactionLogDirName, dbLocation);
+            dbCompactionLogDirName, dbLocation, maxTimeAllowedForSnapshotInDag,
+            compactionDagDaemonInterval);
         rocksDBCheckpointDiffer.setRocksDBForCompactionTracking(dbOptions);
       } else {
         rocksDBCheckpointDiffer = null;
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml 
b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
index b288364f73..3c28aac8cb 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
@@ -88,6 +88,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <dependency>
       <groupId>org.junit.jupiter</groupId>
       <artifactId>junit-jupiter-params</artifactId>
+      <scope>test</scope>
     </dependency>
   </dependencies>
 
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index 7a8ade9f5f..9b015b24e1 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -21,6 +21,11 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.graph.GraphBuilder;
 import com.google.common.graph.MutableGraph;
+import java.util.Collections;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.lang3.StringUtils;
 import org.rocksdb.AbstractEventListener;
 import org.rocksdb.ColumnFamilyDescriptor;
@@ -53,7 +58,6 @@ import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Scanner;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
@@ -83,7 +87,7 @@ import static java.util.Arrays.asList;
  * It is important to note that compaction log is per-DB instance. Since
  * each OM DB instance might trigger compactions at different timings.
  */
-public class RocksDBCheckpointDiffer {
+public class RocksDBCheckpointDiffer implements AutoCloseable {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(RocksDBCheckpointDiffer.class);
@@ -91,7 +95,7 @@ public class RocksDBCheckpointDiffer {
   private final String sstBackupDir;
   private final String activeDBLocationStr;
 
-  private String compactionLogDir = null;
+  private final String compactionLogDir;
 
   /**
    * Compaction log path for DB compaction history persistence.
@@ -103,7 +107,7 @@ public class RocksDBCheckpointDiffer {
    */
   private volatile String currentCompactionLogPath = null;
 
-  private static final String COMPACTION_LOG_FILENAME_SUFFIX = ".log";
+  static final String COMPACTION_LOG_FILE_NAME_SUFFIX = ".log";
 
   /**
    * Marks the beginning of a comment line in the compaction log.
@@ -119,7 +123,22 @@ public class RocksDBCheckpointDiffer {
    * Prefix for the sequence number line when writing to compaction log
    * right after taking an Ozone snapshot.
    */
-  private static final String COMPACTION_LOG_SEQNUM_LINE_PREFIX = "S ";
+  private static final String COMPACTION_LOG_SEQ_NUM_LINE_PREFIX = "S ";
+
+  /**
+   * Delimiter use to join compaction's input and output files.
+   * e.g. input1,input2,input3 or output1,output2,output3
+   */
+  private static final String COMPACTION_LOG_ENTRY_FILE_DELIMITER = ",";
+
+  private static final String SPACE_DELIMITER = " ";
+
+  /**
+   * Delimiter use to join compaction's input and output file set strings.
+   * e.g. input1,input2,input3:output1,output2,output3
+   */
+  private static final String COMPACTION_LOG_ENTRY_INPUT_OUTPUT_FILES_DELIMITER
+      = ":";
 
   /**
    * SST file extension. Must be lower case.
@@ -130,7 +149,7 @@ public class RocksDBCheckpointDiffer {
   private static final int SST_FILE_EXTENSION_LENGTH =
       SST_FILE_EXTENSION.length();
 
-  private static final int LONG_MAX_STRLEN =
+  private static final int LONG_MAX_STR_LEN =
       String.valueOf(Long.MAX_VALUE).length();
 
   /**
@@ -143,83 +162,107 @@ public class RocksDBCheckpointDiffer {
    * Dummy object that acts as a write lock in compaction listener.
    */
   private final Object compactionListenerWriteLock = new Object();
-  /**
-   * Flag for testing. Skips SST file summary reader.
-   */
-  private boolean skipGetSSTFileSummary = false;
+
+  private final ScheduledExecutorService executor;
+  private final long maxAllowedTimeInDag;
 
   /**
    * Constructor.
    * Note that previous compaction logs are loaded by RDBStore after this
    * object's initialization by calling loadAllCompactionLogs().
+   *
    * @param metadataDir Ozone metadata directory.
    * @param sstBackupDir Name of the SST backup dir under metadata dir.
    * @param compactionLogDirName Name of the compaction log dir.
+   * @param activeDBLocation Active RocksDB directory's location.
+   * @param maxTimeAllowedForSnapshotInDagInMs Time after which snapshot will 
be
+   *                                           pruned from the DAG by daemon.
+   * @param pruneCompactionDagDaemonRunIntervalInMs Internal at which DAG
+   *                                               pruning daemon will run.
    */
-  public RocksDBCheckpointDiffer(String metadataDir, String sstBackupDir,
-      String compactionLogDirName, File activeDBLocation) {
-
-    setCompactionLogDir(metadataDir, compactionLogDirName);
-
+  public RocksDBCheckpointDiffer(String metadataDir,
+                                 String sstBackupDir,
+                                 String compactionLogDirName,
+                                 File activeDBLocation,
+                                 long maxTimeAllowedForSnapshotInDagInMs,
+                                 long pruneCompactionDagDaemonRunIntervalInMs) 
{
+    this.compactionLogDir =
+        createCompactionLogDir(metadataDir, compactionLogDirName);
     this.sstBackupDir = Paths.get(metadataDir, sstBackupDir) + "/";
-
-    // Create the directory if SST backup path does not already exist
-    File dir = new File(this.sstBackupDir);
-    if (!dir.exists() && !dir.mkdir()) {
-      final String errorMsg = "Failed to create SST file backup directory. "
-          + "Check if OM has write permission.";
-      LOG.error(errorMsg);
-      throw new RuntimeException(errorMsg);
-    }
+    createSstBackUpDir();
 
     // Active DB location is used in getSSTFileSummary
     this.activeDBLocationStr = activeDBLocation.toString() + "/";
+    this.maxAllowedTimeInDag = maxTimeAllowedForSnapshotInDagInMs;
+    this.executor = Executors.newSingleThreadScheduledExecutor();
+
+    if (pruneCompactionDagDaemonRunIntervalInMs > 0) {
+      this.executor.scheduleWithFixedDelay(
+          this::pruneOlderSnapshotsWithCompactionHistory,
+          pruneCompactionDagDaemonRunIntervalInMs,
+          pruneCompactionDagDaemonRunIntervalInMs,
+          TimeUnit.MILLISECONDS);
+    }
   }
 
-  /**
-   * This constructor is only meant for unit testing.
-   */
-  @VisibleForTesting
-  RocksDBCheckpointDiffer() {
-    this.skipGetSSTFileSummary = true;
-    this.sstBackupDir = null;
-    this.activeDBLocationStr = null;
+  public RocksDBCheckpointDiffer(String sstBackupDir,
+                                 String compactionLogDirName,
+                                 String activeDBLocationName,
+                                 long maxTimeAllowedForSnapshotInDagInMs) {
+    this.compactionLogDir = compactionLogDirName;
+    this.sstBackupDir = sstBackupDir;
+    this.activeDBLocationStr = activeDBLocationName;
+    this.maxAllowedTimeInDag = maxTimeAllowedForSnapshotInDagInMs;
+    this.executor = null;
   }
 
-  private void setCompactionLogDir(String metadataDir,
-      String compactionLogDirName) {
+  private String createCompactionLogDir(String metadataDir,
+                                        String compactionLogDirName) {
 
     final File parentDir = new File(metadataDir);
     if (!parentDir.exists()) {
       if (!parentDir.mkdir()) {
         LOG.error("Error creating compaction log parent dir.");
-        return;
+        return null;
       }
     }
 
-    this.compactionLogDir =
+    final String compactionLogDirectory =
         Paths.get(metadataDir, compactionLogDirName).toString();
-    File clDir = new File(compactionLogDir);
+    File clDir = new File(compactionLogDirectory);
     if (!clDir.exists() && !clDir.mkdir()) {
       LOG.error("Error creating compaction log dir.");
-      return;
+      return null;
     }
 
     // Create a readme file explaining what the compaction log dir is for
-    final Path readmePath = Paths.get(compactionLogDir, "_README.txt");
+    final Path readmePath = Paths.get(compactionLogDirectory, "_README.txt");
     final File readmeFile = new File(readmePath.toString());
     if (!readmeFile.exists()) {
       try (BufferedWriter bw = Files.newBufferedWriter(
           readmePath, StandardOpenOption.CREATE)) {
-        bw.write("This directory holds Ozone Manager RocksDB compaction 
logs.\n"
-            + "DO NOT add, change or delete any files in this directory unless 
"
-            + "you know what you are doing.\n");
+        bw.write("This directory holds Ozone Manager RocksDB compaction" +
+            " logs.\nDO NOT add, change or delete any files in this directory" 
+
+            " unless you know what you are doing.\n");
       } catch (IOException ignored) {
       }
     }
 
-    // Append /
-    this.compactionLogDir += "/";
+    // Append '/' to make it dir.
+    return compactionLogDirectory + "/";
+  }
+
+  /**
+   * Create the directory if SST backup path does not already exist.
+   */
+  private void createSstBackUpDir() {
+    File dir = new File(this.sstBackupDir);
+    if (!dir.exists() && !dir.mkdir()) {
+      String errorMsg = "Failed to create SST file backup directory. "
+          + "Check if OM has write permission.";
+      LOG.error(errorMsg);
+      throw new RuntimeException(errorMsg);
+    }
   }
 
   /**
@@ -229,16 +272,16 @@ public class RocksDBCheckpointDiffer {
   public void setCurrentCompactionLog(long latestSequenceNum) {
     String latestSequenceIdStr = String.valueOf(latestSequenceNum);
 
-    if (latestSequenceIdStr.length() < LONG_MAX_STRLEN) {
+    if (latestSequenceIdStr.length() < LONG_MAX_STR_LEN) {
       // Pad zeroes to the left for ordered file listing when sorted
       // alphabetically.
       latestSequenceIdStr =
-          StringUtils.leftPad(latestSequenceIdStr, LONG_MAX_STRLEN, "0");
+          StringUtils.leftPad(latestSequenceIdStr, LONG_MAX_STR_LEN, "0");
     }
 
     // Local temp variable for storing the new compaction log file path
-    final String newCompactionLog =
-        compactionLogDir + latestSequenceIdStr + 
COMPACTION_LOG_FILENAME_SUFFIX;
+    final String newCompactionLog = compactionLogDir + latestSequenceIdStr +
+        COMPACTION_LOG_FILE_NAME_SUFFIX;
 
     File clFile = new File(newCompactionLog);
     if (clFile.exists()) {
@@ -251,6 +294,13 @@ public class RocksDBCheckpointDiffer {
     appendToCurrentCompactionLog("");
   }
 
+  @Override
+  public void close() throws Exception {
+    if (executor != null) {
+      executor.shutdown();
+    }
+  }
+
   // Hash table to track CompactionNode for a given SST File.
   private final ConcurrentHashMap<String, CompactionNode> compactionNodeMap =
       new ConcurrentHashMap<>();
@@ -309,12 +359,12 @@ public class RocksDBCheckpointDiffer {
   /**
    * Append a sequence number to the compaction log (roughly) when an Ozone
    * snapshot (RDB checkpoint) is taken.
-   * @param sequenceNum RDB sequence number
    */
-  public void appendSequenceNumberToCompactionLog(long sequenceNum,
-      String snapshotID) {
-    final String line = COMPACTION_LOG_SEQNUM_LINE_PREFIX + sequenceNum +
-        " " + snapshotID + "\n";
+  public void appendSnapshotInfoToCompactionLog(long sequenceNum,
+                                                String snapshotID,
+                                                long creationTime) {
+    final String line = COMPACTION_LOG_SEQ_NUM_LINE_PREFIX + sequenceNum +
+        SPACE_DELIMITER + snapshotID + SPACE_DELIMITER + creationTime + "\n";
     appendToCurrentCompactionLog(line);
   }
 
@@ -430,17 +480,19 @@ public class RocksDBCheckpointDiffer {
           // Trim the file path, leave only the SST file name without extension
           inputFiles.replaceAll(s -> s.substring(
               filenameOffset, s.length() - SST_FILE_EXTENSION_LENGTH));
-          final String inputFilesJoined = String.join(",", inputFiles);
+          final String inputFilesJoined =
+              String.join(COMPACTION_LOG_ENTRY_FILE_DELIMITER, inputFiles);
           sb.append(inputFilesJoined);
 
-          // Insert delimiter between input files an output files
-          sb.append(':');
+          // Insert delimiter between input files and output files
+          sb.append(COMPACTION_LOG_ENTRY_INPUT_OUTPUT_FILES_DELIMITER);
 
           // Append the list of output files
           final List<String> outputFiles = compactionJobInfo.outputFiles();
           outputFiles.replaceAll(s -> s.substring(
               filenameOffset, s.length() - SST_FILE_EXTENSION_LENGTH));
-          final String outputFilesJoined = String.join(",", outputFiles);
+          final String outputFilesJoined =
+              String.join(COMPACTION_LOG_ENTRY_FILE_DELIMITER, outputFiles);
           sb.append(outputFilesJoined);
 
           // End of line
@@ -454,11 +506,6 @@ public class RocksDBCheckpointDiffer {
           //  snapshotChainManager.getLatestGlobalSnapshot()
           populateCompactionDAG(inputFiles, outputFiles, null,
               db.getLatestSequenceNumber());
-/*
-          if (debugEnabled(DEBUG_DAG_BUILD_UP)) {
-            printMutableGraph(null, null, compactionDAGFwd);
-          }
- */
         }
       }
     };
@@ -471,7 +518,7 @@ public class RocksDBCheckpointDiffer {
    */
   private long getSSTFileSummary(String filename) throws RocksDBException {
 
-    if (skipGetSSTFileSummary) {
+    if (activeDBLocationStr == null) {
       // For testing only
       return 1L;
     }
@@ -596,23 +643,18 @@ public class RocksDBCheckpointDiffer {
     if (line.startsWith("#")) {
       // Skip comments
       LOG.debug("Comment line, skipped");
-    } else if (line.startsWith(COMPACTION_LOG_SEQNUM_LINE_PREFIX)) {
-      // Read sequence number, and snapshot ID
-      LOG.debug("Reading sequence number as snapshot generation, "
-          + "and snapshot ID");
-      final String trimmedStr =
-          line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length()).trim();
-      final Scanner input = new Scanner(trimmedStr);
-      // This would the snapshot generation for the nodes to come
-      reconstructionSnapshotGeneration = input.nextLong();
-      // This is the snapshotID assigned to every single CompactionNode to come
-      reconstructionLastSnapshotID = input.nextLine().trim();
+    } else if (line.startsWith(COMPACTION_LOG_SEQ_NUM_LINE_PREFIX)) {
+      SnapshotLogInfo snapshotLogInfo = getSnapshotLogInfo(line);
+      reconstructionSnapshotGeneration = snapshotLogInfo.snapshotGenerationId;
+      reconstructionLastSnapshotID = snapshotLogInfo.snapshotId;
     } else if (line.startsWith(COMPACTION_LOG_ENTRY_LINE_PREFIX)) {
       // Read compaction log entry
 
       // Trim the beginning
       line = line.substring(COMPACTION_LOG_ENTRY_LINE_PREFIX.length());
-      final String[] io = line.split(":");
+      String[] io =
+          line.split(COMPACTION_LOG_ENTRY_INPUT_OUTPUT_FILES_DELIMITER);
+
       if (io.length != 2) {
         if (line.endsWith(":")) {
           LOG.debug("Ignoring compaction log line for SST deletion");
@@ -621,8 +663,9 @@ public class RocksDBCheckpointDiffer {
         }
         return;
       }
-      final String[] inputFiles = io[0].split(",");
-      final String[] outputFiles = io[1].split(",");
+
+      String[] inputFiles = io[0].split(COMPACTION_LOG_ENTRY_FILE_DELIMITER);
+      String[] outputFiles = io[1].split(COMPACTION_LOG_ENTRY_FILE_DELIMITER);
       populateCompactionDAG(asList(inputFiles), asList(outputFiles),
           reconstructionLastSnapshotID, reconstructionSnapshotGeneration);
     } else {
@@ -655,7 +698,7 @@ public class RocksDBCheckpointDiffer {
     try {
       try (Stream<Path> pathStream = Files.list(Paths.get(compactionLogDir))
           .filter(e -> e.toString().toLowerCase()
-              .endsWith(COMPACTION_LOG_FILENAME_SUFFIX))
+              .endsWith(COMPACTION_LOG_FILE_NAME_SUFFIX))
           .sorted()) {
         for (Path logPath : pathStream.collect(Collectors.toList())) {
           readCompactionLogToDAG(logPath.toString());
@@ -699,14 +742,14 @@ public class RocksDBCheckpointDiffer {
 
       logSB.append("Fwd DAG same SST files:      ");
       for (String file : fwdDAGSameFiles) {
-        logSB.append(file).append(" ");
+        logSB.append(file).append(SPACE_DELIMITER);
       }
       LOG.debug(logSB.toString());
 
       logSB.setLength(0);
       logSB.append("Fwd DAG different SST files: ");
       for (String file : fwdDAGDifferentFiles) {
-        logSB.append(file).append(" ");
+        logSB.append(file).append(SPACE_DELIMITER);
       }
       LOG.debug("{}", logSB);
     }
@@ -939,6 +982,295 @@ public class RocksDBCheckpointDiffer {
 
   }
 
+  /**
+   * This is the task definition which is run periodically by the service
+   * executor at fixed delay.
+   * It looks for snapshots in compaction DAG which are older than the allowed
+   * time to be in compaction DAG and removes them from the DAG.
+   */
+  public void pruneOlderSnapshotsWithCompactionHistory() {
+    List<Path> olderSnapshotsLogFilePaths =
+        getOlderSnapshotsCompactionLogFilePaths();
+    List<String> lastCompactionSstFiles =
+        getLastCompactionSstFiles(olderSnapshotsLogFilePaths);
+
+    Set<String> sstFileNodesRemoved =
+        pruneSnapshotFileNodesFromDag(lastCompactionSstFiles);
+    removeSstFile(sstFileNodesRemoved);
+    deleteOlderSnapshotsCompactionFiles(olderSnapshotsLogFilePaths);
+  }
+
+  /**
+   * Deletes the SST file from the backup directory if exists.
+   */
+  private void removeSstFile(Set<String> sstFileNodes) {
+    for (String sstFileNode: sstFileNodes) {
+      File file = new File(sstBackupDir + sstFileNode + SST_FILE_EXTENSION);
+      try {
+        Files.deleteIfExists(file.toPath());
+      } catch (IOException exception) {
+        LOG.warn("Failed to delete SST file: " + sstFileNode, exception);
+      }
+    }
+  }
+
+  /**
+   * Returns the list of compaction log files which are older than allowed
+   * max time in the compaction DAG.
+   */
+  private List<Path> getOlderSnapshotsCompactionLogFilePaths() {
+    long compactionLogPruneStartTime = System.currentTimeMillis();
+
+    List<Path> compactionFiles =
+        listCompactionLogFileFromCompactionLogDirectory();
+
+    int index = compactionFiles.size() - 1;
+    for (; index >= 0; index--) {
+      Path compactionLogPath = compactionFiles.get(index);
+      SnapshotLogInfo snapshotLogInfo =
+          getSnapshotInfoFromLog(compactionLogPath);
+
+      if (snapshotLogInfo == null) {
+        continue;
+      }
+
+      if (compactionLogPruneStartTime - snapshotLogInfo.snapshotCreatedAt >
+          maxAllowedTimeInDag) {
+        break;
+      }
+    }
+
+    if (index >= 0) {
+      return compactionFiles.subList(0, index + 1);
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  /**
+   * Returns the list of compaction log file path from compaction log 
directory.
+   */
+  private List<Path> listCompactionLogFileFromCompactionLogDirectory() {
+    try (Stream<Path> pathStream = Files.list(Paths.get(compactionLogDir))
+        .filter(e -> e.toString().toLowerCase()
+            .endsWith(COMPACTION_LOG_FILE_NAME_SUFFIX))
+        .sorted()) {
+      return pathStream.collect(Collectors.toList());
+    } catch (IOException e) {
+      throw new RuntimeException("Error listing compaction log dir " +
+          compactionLogDir, e);
+    }
+  }
+
+  public void deleteOlderSnapshotsCompactionFiles(
+      List<Path> olderSnapshotsLogFilePaths) {
+
+    for (int i = 0; i < olderSnapshotsLogFilePaths.size(); i++) {
+      Path olderSnapshotsLogFilePath = olderSnapshotsLogFilePaths.get(i);
+      try {
+        Files.deleteIfExists(olderSnapshotsLogFilePath);
+      } catch (IOException exception) {
+        LOG.error("Failed to deleted SST file: {}", olderSnapshotsLogFilePath,
+            exception);
+      }
+    }
+  }
+
+  /**
+   * Prunes forward and backward DAGs when oldest snapshot with compaction
+   * history gets deleted.
+   */
+  public Set<String > pruneSnapshotFileNodesFromDag(List<String> sstFileNodes) 
{
+    Set<CompactionNode> startNodes = new HashSet<>();
+    for (String sstFileNode : sstFileNodes) {
+      CompactionNode infileNode = compactionNodeMap.get(sstFileNode);
+      if (infileNode == null) {
+        LOG.warn("Compaction node doesn't exist for sstFile: {}.", 
sstFileNode);
+        continue;
+      }
+
+      startNodes.add(infileNode);
+    }
+
+    pruneBackwardDag(backwardCompactionDAG, startNodes);
+    Set<String> sstFilesPruned = pruneForwardDag(forwardCompactionDAG,
+        startNodes);
+
+    // Remove SST file nodes from compactionNodeMap too,
+    // since those nodes won't be needed after clean up.
+    sstFilesPruned.forEach(compactionNodeMap::remove);
+
+    return sstFilesPruned;
+  }
+
+  /**
+   * Prunes backward DAG's upstream from the level, that needs to be removed.
+   */
+  @VisibleForTesting
+  Set<String> pruneBackwardDag(MutableGraph<CompactionNode> backwardDag,
+                               Set<CompactionNode> startNodes) {
+    Set<String> removedFiles = new HashSet<>();
+    Set<CompactionNode> currentLevel = startNodes;
+
+    while (!currentLevel.isEmpty()) {
+      Set<CompactionNode> nextLevel = new HashSet<>();
+      for (CompactionNode current : currentLevel) {
+        if (!backwardDag.nodes().contains(current)) {
+          continue;
+        }
+
+        nextLevel.addAll(backwardDag.predecessors(current));
+        backwardDag.removeNode(current);
+        removedFiles.add(current.getFileName());
+      }
+      currentLevel = nextLevel;
+    }
+
+    return removedFiles;
+  }
+
+  /**
+   * Prunes forward DAG's downstream from the level that needs to be removed.
+   */
+  @VisibleForTesting
+  Set<String> pruneForwardDag(MutableGraph<CompactionNode> forwardDag,
+                              Set<CompactionNode> startNodes) {
+    Set<String> removedFiles = new HashSet<>();
+    Set<CompactionNode> currentLevel = new HashSet<>(startNodes);
+
+    while (!currentLevel.isEmpty()) {
+      Set<CompactionNode> nextLevel = new HashSet<>();
+      for (CompactionNode current : currentLevel) {
+        if (!forwardDag.nodes().contains(current)) {
+          continue;
+        }
+
+        nextLevel.addAll(forwardDag.successors(current));
+        forwardDag.removeNode(current);
+        removedFiles.add(current.getFileName());
+      }
+
+      currentLevel = nextLevel;
+    }
+
+    return removedFiles;
+  }
+
+  private SnapshotLogInfo getSnapshotInfoFromLog(Path compactionLogFile) {
+    AtomicReference<SnapshotLogInfo> snapshotLogInfo =
+        new AtomicReference<>();
+    try (Stream<String> logStream = Files.lines(compactionLogFile, UTF_8)) {
+      logStream.forEach(logLine -> {
+        if (!logLine.startsWith(COMPACTION_LOG_SEQ_NUM_LINE_PREFIX)) {
+          return;
+        }
+
+        snapshotLogInfo.set(getSnapshotLogInfo(logLine));
+      });
+    } catch (IOException exception) {
+      throw new RuntimeException("Failed to read compaction log file: " +
+          compactionLogFile, exception);
+    }
+
+    return snapshotLogInfo.get();
+  }
+
+  /**
+   * Converts a snapshot compaction log line to SnapshotLogInfo.
+   */
+  private SnapshotLogInfo getSnapshotLogInfo(String logLine) {
+    // Remove `S ` from the line.
+    String line =
+        logLine.substring(COMPACTION_LOG_SEQ_NUM_LINE_PREFIX.length());
+
+    String[] splits = line.split(SPACE_DELIMITER);
+    Preconditions.checkArgument(splits.length == 3,
+        "Snapshot info log statement has more than expected parameters.");
+
+    return new SnapshotLogInfo(Long.parseLong(splits[0]),
+        splits[1],
+        Long.parseLong(splits[2]));
+  }
+
+  /**
+   * Returns the list of SST files got compacted in the last compaction from
+   * the provided list of compaction log files.
+   * We can't simply use last file from the list because it is possible that
+   * no compaction happened between the last snapshot and previous to that.
+   * Hence, we go over the list in reverse order and return the SST files from
+   * first the compaction happened in the reverse list.
+   * If no compaction happen at all, it returns empty list.
+   */
+  private List<String> getLastCompactionSstFiles(
+      List<Path> compactionLogFiles
+  ) {
+
+    if (compactionLogFiles.isEmpty()) {
+      return Collections.emptyList();
+    }
+    compactionLogFiles = new ArrayList<>(compactionLogFiles);
+    Collections.reverse(compactionLogFiles);
+
+    for (Path compactionLogFile: compactionLogFiles) {
+      List<String> sstFiles = getLastCompactionSstFiles(compactionLogFile);
+      if (!sstFiles.isEmpty()) {
+        return  sstFiles;
+      }
+    }
+
+    return Collections.emptyList();
+  }
+
+  private List<String>  getLastCompactionSstFiles(Path compactionLogFile) {
+
+    AtomicReference<String> sstFiles = new AtomicReference<>();
+
+    try (Stream<String> logStream = Files.lines(compactionLogFile, UTF_8)) {
+      logStream.forEach(logLine -> {
+        if (!logLine.startsWith(COMPACTION_LOG_ENTRY_LINE_PREFIX)) {
+          return;
+        }
+        sstFiles.set(logLine);
+      });
+    } catch (IOException exception) {
+      throw new RuntimeException("Failed to read file: " + compactionLogFile,
+          exception);
+    }
+
+    String lastCompactionLogEntry = sstFiles.get();
+
+    if (StringUtils.isEmpty(lastCompactionLogEntry)) {
+      return Collections.emptyList();
+    }
+
+    // Trim the beginning
+    lastCompactionLogEntry = lastCompactionLogEntry
+        .substring(COMPACTION_LOG_ENTRY_LINE_PREFIX.length());
+
+    String[] io = lastCompactionLogEntry
+        .split(COMPACTION_LOG_ENTRY_INPUT_OUTPUT_FILES_DELIMITER);
+
+    assert (io.length == 2);
+
+    String[] outputFiles = io[1].split(COMPACTION_LOG_ENTRY_FILE_DELIMITER);
+
+    return asList(outputFiles);
+  }
+
+  private static final class SnapshotLogInfo {
+    private final long snapshotGenerationId;
+    private final String snapshotId;
+    private final long snapshotCreatedAt;
+
+    private SnapshotLogInfo(long snapshotGenerationId,
+                            String snapshotId,
+                            long snapshotCreatedAt) {
+      this.snapshotGenerationId = snapshotGenerationId;
+      this.snapshotId = snapshotId;
+      this.snapshotCreatedAt = snapshotCreatedAt;
+    }
+  }
+
   @VisibleForTesting
   public boolean debugEnabled(Integer level) {
     return DEBUG_LEVEL.contains(level);
@@ -953,5 +1285,4 @@ public class RocksDBCheckpointDiffer {
   public ConcurrentHashMap<String, CompactionNode> getCompactionNodeMap() {
     return compactionNodeMap;
   }
-
 }
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
index 590e740575..af5aec638a 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
@@ -19,10 +19,8 @@ package org.apache.ozone.rocksdiff;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Arrays.asList;
-import static 
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_DAG_LIVE_NODES;
-import static 
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_READ_ALL_DB_KEYS;
-import static org.junit.jupiter.api.Assertions.fail;
-
+import static java.util.concurrent.TimeUnit.MINUTES;
+import com.google.common.graph.GraphBuilder;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
@@ -35,17 +33,19 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
-import java.util.Random;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import com.google.common.graph.MutableGraph;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.NodeComparator;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -65,6 +65,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.event.Level;
 
+import static 
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.COMPACTION_LOG_FILE_NAME_SUFFIX;
+import static 
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_DAG_LIVE_NODES;
+import static 
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_READ_ALL_DB_KEYS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
 /**
  * Test RocksDBCheckpointDiffer basic functionality.
  */
@@ -84,19 +92,10 @@ public class TestRocksDBCheckpointDiffer {
    * RocksDB checkpoint path prefix.
    */
   private static final String CP_PATH_PREFIX = "rocksdb-cp-";
-  private final ArrayList<DifferSnapshotInfo> snapshots = new ArrayList<>();
-
-  /**
-   * Graph type.
-   */
-  enum GType {
-    FNAME,
-    KEYSIZE,
-    CUMUTATIVE_SIZE
-  }
+  private final List<DifferSnapshotInfo> snapshots = new ArrayList<>();
 
-  @BeforeAll
-  public static void init() {
+  @BeforeEach
+  public void init() {
     // Checkpoint differ log level. Set to DEBUG for verbose output
     GenericTestUtils.setLogLevel(RocksDBCheckpointDiffer.getLog(), Level.INFO);
     // Test class log level. Set to DEBUG for verbose output
@@ -203,25 +202,33 @@ public class TestRocksDBCheckpointDiffer {
       Set<String> expectedDiffSstFiles,
       boolean expectingException) {
 
-    RocksDBCheckpointDiffer differ = new RocksDBCheckpointDiffer();
+    RocksDBCheckpointDiffer differ =
+        new RocksDBCheckpointDiffer(null, null, null, 0L);
     boolean exceptionThrown = false;
+    long createdTime = System.currentTimeMillis();
 
     String compactionLog = ""
-        + "S 1000 df6410c7-151b-4e90-870e-5ef12875acd5\n"  // Snapshot 0
+        // Snapshot 0
+        + "S 1000 df6410c7-151b-4e90-870e-5ef12875acd5 " + createdTime + " \n"
+        // Additional "compaction" to trigger and test early exit condition
         + "C 000001,000002:000062\n"
-          // Additional "compaction" to trigger and test early exit condition
-        + "S 3008 ef6410c7-151b-4e90-870e-5ef12875acd5\n"  // Snapshot 1
-        + "C 000068,000062:000069\n"  // Regular compaction
+        // Snapshot 1
+        + "S 3008 ef6410c7-151b-4e90-870e-5ef12875acd5 " + createdTime + " \n"
+        // Regular compaction
+        + "C 000068,000062:000069\n"
+        // Trivial move
         + "C 000071,000064,000060,000052:000071,000064,000060,000052\n"
-          // Trivial move
         + "C 000073,000066:000074\n"
         + "C 000082,000076,000069:000083\n"
         + "C 000087,000080,000074:000088\n"
-        + "C 000093,000090,000083:\n"  // Deletion?
-        + "S 14980 e7ad72f8-52df-4430-93f6-0ee91d4a47fd\n"  // Snapshot 2
+        // Deletion?
+        + "C 000093,000090,000083:\n"
+        // Snapshot 2
+        + "S 14980 e7ad72f8-52df-4430-93f6-0ee91d4a47fd " + createdTime + "\n"
         + "C 000098,000096,000085,000078,000071,000064,000060,000052:000099\n"
         + "C 000105,000095,000088:000107\n"
-        + "S 17975 4f084f6e-ed3d-4780-8362-f832303309ea\n";  // Snapshot 3
+        // Snapshot 3
+        + "S 17975 4f084f6e-ed3d-4780-8362-f832303309ea " + createdTime + "\n";
 
     // Construct DAG from compaction log input
     Arrays.stream(compactionLog.split("\n")).forEach(
@@ -277,7 +284,9 @@ public class TestRocksDBCheckpointDiffer {
 
     final File dbLocation = new File(TEST_DB_PATH);
     RocksDBCheckpointDiffer differ = new RocksDBCheckpointDiffer(
-        metadataDirStr, sstDirStr, clDirStr, dbLocation);
+        metadataDirStr, sstDirStr, clDirStr, dbLocation,
+        TimeUnit.DAYS.toMillis(1),
+        MINUTES.toMillis(5));
 
     // Empty the SST backup folder first for testing
     File sstDir = new File(sstDirStr);
@@ -312,32 +321,9 @@ public class TestRocksDBCheckpointDiffer {
       differ.dumpCompactionNodeTable();
     }
 
-    for (GType gtype : GType.values()) {
-      String fname = "fwdGraph_" + gtype +  ".png";
-      String rname = "reverseGraph_" + gtype + ".png";
-/*
-      differ.pngPrintMutableGrapth(differ.getCompactionFwdDAG(), fname, gtype);
-      differ.pngPrintMutableGrapth(
-          differ.getCompactionReverseDAG(), rname, gtype);
- */
-    }
-
     rocksDB.close();
   }
 
-  private String getRandomString(Random random, int length) {
-    // Ref: https://www.baeldung.com/java-random-string
-    final int leftLimit = 48; // numeral '0'
-    final int rightLimit = 122; // letter 'z'
-
-    return random.ints(leftLimit, rightLimit + 1)
-        .filter(i -> (i <= 57 || i >= 65) && (i <= 90 || i >= 97))
-        .limit(length)
-        .collect(StringBuilder::new,
-            StringBuilder::appendCodePoint, StringBuilder::append)
-        .toString();
-  }
-
   /**
    * Test SST differ.
    */
@@ -396,8 +382,9 @@ public class TestRocksDBCheckpointDiffer {
     this.snapshots.add(currentSnapshot);
 
     // Same as what OmSnapshotManager#createOmSnapshotCheckpoint would do
-    differ.appendSequenceNumberToCompactionLog(
-        dbLatestSequenceNumber, snapshotId);
+    differ.appendSnapshotInfoToCompactionLog(dbLatestSequenceNumber,
+        snapshotId,
+        System.currentTimeMillis());
 
     differ.setCurrentCompactionLog(dbLatestSequenceNumber);
 
@@ -453,10 +440,9 @@ public class TestRocksDBCheckpointDiffer {
 
     differ.setCurrentCompactionLog(rocksDB.getLatestSequenceNumber());
 
-    Random random = new Random();
     // key-value
     for (int i = 0; i < NUM_ROW; ++i) {
-      String generatedString = getRandomString(random, 7);
+      String generatedString = RandomStringUtils.randomAlphabetic(7);
       String keyStr = "Key-" + i + "-" + generatedString;
       String valueStr = "Val-" + i + "-" + generatedString;
       byte[] key = keyStr.getBytes(UTF_8);
@@ -678,4 +664,440 @@ public class TestRocksDBCheckpointDiffer {
 
     LOG.debug("Files are: {}", allNodesSet);
   }
-}
\ No newline at end of file
+
+  private static final List<List<String>> SST_FILES_BY_LEVEL = Arrays.asList(
+      Arrays.asList("000015", "000013", "000011", "000009"),
+      Arrays.asList("000018", "000016", "000017", "000026", "000024", "000022",
+          "000020"),
+      Arrays.asList("000027", "000030", "000028", "000029", "000031", "000039",
+          "000037", "000035", "000033"),
+      Arrays.asList("000040", "000044", "000042", "000043", "000045", "000041",
+          "000046", "000054", "000052", "000050", "000048"),
+      Arrays.asList("000059", "000055", "000056", "000060", "000057", "000058")
+  );
+
+  private static final List<List<CompactionNode>> COMPACTION_NODES_BY_LEVEL =
+      SST_FILES_BY_LEVEL.stream()
+          .map(sstFiles ->
+              sstFiles.stream()
+                  .map(
+                      sstFile -> new CompactionNode(sstFile,
+                          UUID.randomUUID().toString(),
+                          1000L,
+                          Long.parseLong(sstFile.substring(0, 6))
+                      ))
+                  .collect(Collectors.toList()))
+          .collect(Collectors.toList());
+
+  /**
+   * Creates a backward compaction DAG from a list of level nodes.
+   * It assumes that at each level files get compacted to the half of number
+   * of files at the next level.
+   * e.g. if level-1 has 7 files and level-2 has 9 files, so first 4 files
+   * at level-2 are from compaction of level-1 and rests are new.
+   */
+  private static MutableGraph<CompactionNode> createBackwardDagFromLevelNodes(
+      int fromLevel,
+      int toLevel
+  ) {
+    MutableGraph<CompactionNode> dag  = GraphBuilder.directed().build();
+
+    if (fromLevel == toLevel) {
+      COMPACTION_NODES_BY_LEVEL.get(fromLevel).forEach(dag::addNode);
+      return dag;
+    }
+
+    for (int level = fromLevel; level < toLevel; level++) {
+      List<CompactionNode> currentLevel = COMPACTION_NODES_BY_LEVEL.get(level);
+      List<CompactionNode> nextLevel = COMPACTION_NODES_BY_LEVEL.get(level + 
1);
+
+      for (int i = 0; i < currentLevel.size(); i++) {
+        for (int j = 0; j < nextLevel.size(); j++) {
+          dag.addNode(currentLevel.get(i));
+          dag.addNode(nextLevel.get(j));
+
+          int child = nextLevel.size();
+          if (level < COMPACTION_NODES_BY_LEVEL.size() - 2) {
+            child /= 2;
+          }
+
+          if (j < child) {
+            dag.putEdge(currentLevel.get(i), nextLevel.get(j));
+          }
+        }
+      }
+    }
+
+    return dag;
+  }
+
+  /**
+   * Creates a forward compaction DAG from a list of level nodes.
+   * It assumes that at each level first half of the files are from the
+   * compaction of the previous level.
+   * e.g. if level-1 has 7 files and level-2 has 9 files, so first 4 files
+   * at level-2 are from compaction of level-1 and rests are new.
+   */
+  private static MutableGraph<CompactionNode> createForwardDagFromLevelNodes(
+      int fromLevel,
+      int toLevel
+  ) {
+    MutableGraph<CompactionNode> dag  = GraphBuilder.directed().build();
+
+    if (fromLevel == toLevel) {
+      COMPACTION_NODES_BY_LEVEL.get(fromLevel).forEach(dag::addNode);
+      return dag;
+    }
+
+    dag  = GraphBuilder.directed().build();
+    for (int level = fromLevel; level > toLevel; level--) {
+      List<CompactionNode> currentLevel = COMPACTION_NODES_BY_LEVEL.get(level);
+      List<CompactionNode> nextLevel = COMPACTION_NODES_BY_LEVEL.get(level - 
1);
+
+      for (int i = 0; i < currentLevel.size(); i++) {
+        for (int j = 0; j < nextLevel.size(); j++) {
+          dag.addNode(currentLevel.get(i));
+          dag.addNode(nextLevel.get(j));
+
+          int parent = currentLevel.size();
+          if (level < COMPACTION_NODES_BY_LEVEL.size() - 1) {
+            parent /= 2;
+          }
+
+          if (i < parent) {
+            dag.putEdge(currentLevel.get(i), nextLevel.get(j));
+          }
+        }
+      }
+    }
+
+    return dag;
+  }
+
+  /**
+   * Test cases for pruneBackwardDag.
+   */
+  private static Stream<Arguments> pruneBackwardDagScenarios() {
+    Set<String> level0Files = new HashSet<>(SST_FILES_BY_LEVEL.get(0));
+    Set<String> level1Files = new HashSet<>(SST_FILES_BY_LEVEL.get(1));
+    Set<String> level2Files = new HashSet<>(SST_FILES_BY_LEVEL.get(2));
+    Set<String> level3Files = new HashSet<>(SST_FILES_BY_LEVEL.get(3));
+
+    level1Files.addAll(level0Files);
+    level2Files.addAll(level1Files);
+    level3Files.addAll(level2Files);
+
+    return Stream.of(
+        Arguments.of("Remove level 0 from backward DAG",
+            createBackwardDagFromLevelNodes(0, 4),
+            new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(0)),
+            createBackwardDagFromLevelNodes(1, 4),
+            level0Files
+        ),
+        Arguments.of("Remove level 1 from backward DAG",
+            createBackwardDagFromLevelNodes(0, 4),
+            new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(1)),
+            createBackwardDagFromLevelNodes(2, 4),
+            level1Files
+        ),
+        Arguments.of("Remove level 2 from backward DAG",
+            createBackwardDagFromLevelNodes(0, 4),
+            new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(2)),
+            createBackwardDagFromLevelNodes(3, 4),
+            level2Files
+        ),
+        Arguments.of("Remove level 3 from backward DAG",
+            createBackwardDagFromLevelNodes(0, 4),
+            new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(3)),
+            createBackwardDagFromLevelNodes(4, 4),
+            level3Files
+        )
+    );
+  }
+
+  @ParameterizedTest(name = "{0}")
+  @MethodSource("pruneBackwardDagScenarios")
+  public void testPruneBackwardDag(String description,
+                                   MutableGraph<CompactionNode> originalDag,
+                                   Set<CompactionNode> levelToBeRemoved,
+                                   MutableGraph<CompactionNode> expectedDag,
+                                   Set<String> expectedFileNodesRemoved) {
+
+    RocksDBCheckpointDiffer differ =
+        new RocksDBCheckpointDiffer(null, null, null, 0L);
+    Set<String> actualFileNodesRemoved =
+        differ.pruneBackwardDag(originalDag, levelToBeRemoved);
+    Assertions.assertEquals(expectedDag, originalDag);
+    Assertions.assertEquals(actualFileNodesRemoved, expectedFileNodesRemoved);
+  }
+
+
+  /**
+   * Test cases for pruneBackwardDag.
+   */
+  private static Stream<Arguments> pruneForwardDagScenarios() {
+    Set<String> level0Files = new HashSet<>(SST_FILES_BY_LEVEL.get(0));
+    Set<String> level1Files = new HashSet<>(SST_FILES_BY_LEVEL.get(1));
+    Set<String> level2Files = new HashSet<>(SST_FILES_BY_LEVEL.get(2));
+    Set<String> level3Files = new HashSet<>(SST_FILES_BY_LEVEL.get(3));
+
+    level1Files.addAll(level0Files);
+    level2Files.addAll(level1Files);
+    level3Files.addAll(level2Files);
+
+    return Stream.of(
+        Arguments.of("Remove level 0 from forward DAG",
+            createForwardDagFromLevelNodes(4, 0),
+            new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(0)),
+            createForwardDagFromLevelNodes(4, 1),
+            level0Files
+        ),
+        Arguments.of("Remove level 1 from forward DAG",
+            createForwardDagFromLevelNodes(4, 0),
+            new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(1)),
+            createForwardDagFromLevelNodes(4, 2),
+            level1Files
+        ),
+        Arguments.of("Remove level 2 from forward DAG",
+            createForwardDagFromLevelNodes(4, 0),
+            new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(2)),
+            createForwardDagFromLevelNodes(4, 3),
+            level2Files
+        ),
+        Arguments.of("Remove level 3 from forward DAG",
+            createForwardDagFromLevelNodes(4, 0),
+            new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(3)),
+            createForwardDagFromLevelNodes(4, 4),
+            level3Files
+        )
+    );
+  }
+
+  @ParameterizedTest(name = "{0}")
+  @MethodSource("pruneForwardDagScenarios")
+  public void testPruneForwardDag(String description,
+                                  MutableGraph<CompactionNode> originalDag,
+                                  Set<CompactionNode> levelToBeRemoved,
+                                  MutableGraph<CompactionNode> expectedDag,
+                                  Set<String> expectedFileNodesRemoved) {
+
+    RocksDBCheckpointDiffer differ =
+        new RocksDBCheckpointDiffer(null, null, null, 0L);
+    Set<String> actualFileNodesRemoved =
+        differ.pruneForwardDag(originalDag, levelToBeRemoved);
+    Assertions.assertEquals(expectedDag, originalDag);
+    Assertions.assertEquals(actualFileNodesRemoved, expectedFileNodesRemoved);
+  }
+
+  private static Stream<Arguments> compactionDagPruningScenarios() {
+    long currentTimeMillis = System.currentTimeMillis();
+
+    String compactionLogFile0 = "S 1000 snapshotId0 " +
+        (currentTimeMillis - MINUTES.toMillis(30)) + " \n";
+    String compactionLogFile1 = "C 000015,000013,000011,000009:000018,000016," 
+
+        "000017\n"
+        + "S 2000 snapshotId1 " +
+        (currentTimeMillis - MINUTES.toMillis(24)) + " \n";
+
+    String compactionLogFile2 = "C 000018,000016,000017,000026,000024,000022," 
+
+        "000020:000027,000030,000028,000031,000029\n"
+        + "S 3000 snapshotId2 " +
+        (currentTimeMillis - MINUTES.toMillis(18)) + " \n";
+
+    String compactionLogFile3 = "C 000027,000030,000028,000031,000029,000039," 
+
+        "000037,000035,000033:000040,000044,000042,000043,000046,000041," +
+        "000045\n"
+        + "S 3000 snapshotId3 " +
+        (currentTimeMillis - MINUTES.toMillis(12)) + " \n";
+
+    String compactionLogFile4 = "C 000040,000044,000042,000043,000046,000041," 
+
+        "000045,000054,000052,000050,000048:000059,000055,000056,000060," +
+        "000057,000058\n"
+        + "S 3000 snapshotId4 " +
+        (currentTimeMillis - MINUTES.toMillis(6)) + " \n";
+
+    String compactionLogFileWithoutSnapshot1 = "C 000015,000013,000011," +
+        "000009:000018,000016,000017\n" +
+        "C 000018,000016,000017,000026,000024,000022,000020:000027,000030," +
+        "000028,000031,000029\n";
+
+    String compactionLogFileWithoutSnapshot2 = "C 000027,000030,000028," +
+        "000031,000029,000039,000037,000035,000033:000040,000044,000042," +
+        "000043,000046,000041,000045\n";
+
+    String compactionLogFileWithoutSnapshot3 = "C 000040,000044,000042," +
+        "000043,000046,000041,000045,000054,000052,000050,000048:000059," +
+        "000055,000056,000060,000057,000058\n";
+
+    String compactionLogFileOnlyWithSnapshot1 =
+        "S 3000 snapshotIdWithoutCompaction1 " +
+            (currentTimeMillis - MINUTES.toMillis(18)) + " \n";
+
+    String compactionLogFileOnlyWithSnapshot2 =
+        "S 3000 snapshotIdWithoutCompaction2 " +
+            (currentTimeMillis - MINUTES.toMillis(15)) + " \n";
+
+    String compactionLogFileOnlyWithSnapshot3 =
+        "S 3000 snapshotIdWithoutCompaction3 " +
+            (currentTimeMillis - MINUTES.toMillis(12)) + " \n";
+
+    String compactionLogFileOnlyWithSnapshot4 =
+        "S 3000 snapshotIdWithoutCompaction4 " +
+            (currentTimeMillis - MINUTES.toMillis(9)) + " \n";
+
+    String compactionLogFileOnlyWithSnapshot5 =
+        "S 3000 snapshotIdWithoutCompaction5 " +
+            (currentTimeMillis - MINUTES.toMillis(6)) + " \n";
+
+    String compactionLogFileOnlyWithSnapshot6 =
+        "S 3000 snapshotIdWithoutCompaction6 " +
+            (currentTimeMillis - MINUTES.toMillis(3)) + " \n";
+
+    Set<String> expectedNodes = new HashSet<>(
+        Arrays.asList("000054", "000052", "000050", "000048", "000059",
+            "000055", "000056", "000060", "000057", "000058")
+    );
+
+    Set<String> expectedAllNodes = new HashSet<>(
+        Arrays.asList("000058", "000013", "000035", "000057", "000056",
+            "000011", "000033", "000055", "000018", "000017", "000039",
+            "000016", "000015", "000037", "000059", "000060", "000043",
+            "000020", "000042", "000041", "000040", "000024", "000046",
+            "000045", "000022", "000044", "000029", "000028", "000027",
+            "000026", "000048", "000009", "000050", "000054", "000031",
+            "000030", "000052")
+    );
+
+    return Stream.of(
+        Arguments.of("Each compaction log file has only one snapshot and one" +
+                " compaction statement except first log file.",
+            Arrays.asList(compactionLogFile0, compactionLogFile1,
+                compactionLogFile2, compactionLogFile3, compactionLogFile4),
+            expectedNodes,
+            4
+        ),
+        Arguments.of("Compaction log doesn't have snapshot  because OM" +
+                " restarted. Restart happened before snapshot to be deleted.",
+            Arrays.asList(compactionLogFile0,
+                compactionLogFileWithoutSnapshot1,
+                compactionLogFile3,
+                compactionLogFile4),
+            expectedNodes,
+            3
+        ),
+        Arguments.of("Compaction log doesn't have snapshot because OM" +
+                " restarted. Restart happened after snapshot to be deleted.",
+            Arrays.asList(compactionLogFile0, compactionLogFile1,
+                compactionLogFile2, compactionLogFile3,
+                compactionLogFileWithoutSnapshot3,
+                compactionLogFileOnlyWithSnapshot4),
+            expectedNodes,
+            4
+        ),
+        Arguments.of("No compaction happened in between two snapshots.",
+            Arrays.asList(compactionLogFile0, compactionLogFile1,
+                compactionLogFile2, compactionLogFile3,
+                compactionLogFileOnlyWithSnapshot1,
+                compactionLogFileOnlyWithSnapshot2, compactionLogFile4),
+            expectedNodes,
+            6
+        ),
+        Arguments.of("No snapshot is taken and only one compaction log file,",
+            Collections.singletonList(compactionLogFileWithoutSnapshot1 +
+                compactionLogFileWithoutSnapshot2 +
+                compactionLogFileWithoutSnapshot3),
+            expectedAllNodes,
+            0
+        ),
+        Arguments.of("No snapshot is taken but multiple compaction files" +
+                " because of OM restart.",
+            Arrays.asList(compactionLogFileWithoutSnapshot1,
+                compactionLogFileWithoutSnapshot2,
+                compactionLogFileWithoutSnapshot3),
+            expectedAllNodes,
+            0
+        ),
+        Arguments.of("Only contains snapshots but no compaction.",
+            Arrays.asList(compactionLogFileOnlyWithSnapshot1,
+                compactionLogFileOnlyWithSnapshot2,
+                compactionLogFileOnlyWithSnapshot3,
+                compactionLogFileOnlyWithSnapshot4,
+                compactionLogFileOnlyWithSnapshot5,
+                compactionLogFileOnlyWithSnapshot6),
+            Collections.emptySet(),
+            3
+        ),
+        Arguments.of("No file exists because compaction has not happened" +
+                " and snapshot is not taken.",
+            Collections.emptyList(),
+            Collections.emptySet(),
+            0
+        )
+    );
+  }
+
+  /**
+   * End-to-end test for snapshot's compaction history pruning.
+   */
+  @ParameterizedTest(name = "{0}")
+  @MethodSource("compactionDagPruningScenarios")
+  public void testPruneOlderSnapshotsWithCompactionHistory(
+      String description,
+      List<String> compactionLogs,
+      Set<String> expectedNodes,
+      int expectedNumberOfLogFilesDeleted
+  ) throws IOException {
+    String compactionLogDirName = "./test-compaction-log";
+    File compactionLogDir = new File(compactionLogDirName);
+    if (!compactionLogDir.exists() && !compactionLogDir.mkdirs()) {
+      fail("Error creating compaction log directory: " + compactionLogDirName);
+    }
+
+    String sstBackUpDirName = "./test-compaction-sst-backup";
+    File sstBackUpDir = new File(sstBackUpDirName);
+    if (!sstBackUpDir.exists() && !sstBackUpDir.mkdirs()) {
+      fail("Error creating SST backup directory: " + sstBackUpDirName);
+    }
+
+    List<File> filesCreated = new ArrayList<>();
+
+    for (int i = 0; i < compactionLogs.size(); i++) {
+      String compactionFileName =
+          compactionLogDirName + "/0000" + i + COMPACTION_LOG_FILE_NAME_SUFFIX;
+      File compactionFile = new File(compactionFileName);
+      Files.write(compactionFile.toPath(),
+          compactionLogs.get(i).getBytes(UTF_8));
+      filesCreated.add(compactionFile);
+    }
+
+    RocksDBCheckpointDiffer differ =
+        new RocksDBCheckpointDiffer(sstBackUpDirName,
+            compactionLogDirName,
+            null,
+            MINUTES.toMillis(10));
+
+    differ.loadAllCompactionLogs();
+
+    differ.pruneOlderSnapshotsWithCompactionHistory();
+
+    Set<String> actualNodes = differ.getForwardCompactionDAG().nodes().stream()
+        .map(CompactionNode::getFileName)
+        .collect(Collectors.toSet());
+
+    assertEquals(expectedNodes, actualNodes);
+
+    for (int i = 0; i < expectedNumberOfLogFilesDeleted; i++) {
+      File compactionFile = filesCreated.get(i);
+      assertFalse(compactionFile.exists());
+    }
+
+    for (int i = expectedNumberOfLogFilesDeleted; i < compactionLogs.size();
+         i++) {
+      File compactionFile = filesCreated.get(i);
+      assertTrue(compactionFile.exists());
+    }
+
+    deleteDirectory(compactionLogDir);
+    deleteDirectory(sstBackUpDir);
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index 45bdafee7d..bbf0939a2a 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -150,8 +150,9 @@ public final class OmSnapshotManager {
 
     // Write snapshot generation (latest sequence number) to compaction log.
     // This will be used for DAG reconstruction as snapshotGeneration.
-    dbCpDiffer.appendSequenceNumberToCompactionLog(dbLatestSequenceNumber,
-        snapshotInfo.getSnapshotID());
+    dbCpDiffer.appendSnapshotInfoToCompactionLog(dbLatestSequenceNumber,
+        snapshotInfo.getSnapshotID(),
+        snapshotInfo.getCreationTime());
 
     // Set compaction log filename to the latest DB sequence number
     // right after taking the RocksDB checkpoint for Ozone snapshot.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to