smengcl commented on code in PR #4045:
URL: https://github.com/apache/ozone/pull/4045#discussion_r1047859221


##########
hadoop-hdds/common/src/main/resources/ozone-default.xml:
##########
@@ -3407,4 +3407,22 @@
     </description>
   </property>
 
+  <property>
+    <name>ozone.om.snapshot.compaction.dag.max.time.allowed.ms</name>
+    <value>30d</value>

Review Comment:
   Is `30d` correctly parsed into millis when actually read from 
`ozone-site.xml`? Looks to be the case, just need to double check.



##########
hadoop-hdds/common/src/main/resources/ozone-default.xml:
##########
@@ -3407,4 +3407,22 @@
     </description>
   </property>
 
+  <property>
+    <name>ozone.om.snapshot.compaction.dag.max.time.allowed.ms</name>
+    <value>30d</value>
+    <tag>OZONE, OM</tag>
+    <description>
+      Maximum time a snapshot is allowed to be in compaction DAG before it 
gets pruned out by pruning daemon.

Review Comment:
   ```suggestion
         Maximum time a snapshot is allowed to be in compaction DAG before it 
gets pruned out by pruning daemon. Uses millisecond by default when no time 
unit is specified.
   ```
   
   Same below.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java:
##########
@@ -145,6 +152,16 @@ private DBStoreBuilder(ConfigurationSource configuration,
     defaultCfProfile = this.configuration.getEnum(HDDS_DB_PROFILE,
           HDDS_DEFAULT_DB_PROFILE);
     LOG.debug("Default DB profile:{}", defaultCfProfile);
+
+    maxTimeAllowedForSnapshotInDag = configuration.getTimeDuration(
+        OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED_MS,
+        OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED_MS_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    pruneCompactionDagDaemonRunInterval = configuration.getTimeDuration(
+        OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL_MS,
+        OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_DAG_DAEMON_RUN_INTERVAL_MS_DEFAULT,
+        TimeUnit.MILLISECONDS);

Review Comment:
   I'm fine with reading the DAG configs here in `DBStoreBuilder` for now.
   
   Those configs IMO are more closely tied to `RocksDBCheckpointDiffer` and 
could be read inside `RDBStore` in the future when we pass `configuration` into 
it.



##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java:
##########
@@ -534,6 +534,23 @@ public final class OzoneConfigKeys {
   public static final String OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE =
       "ozone.om.snapshot.cache.max.size";
   public static final int OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE_DEFAULT = 10;
+
+  public static final String
+      OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED_MS =
+      "ozone.om.snapshot.compaction.dag.max.time.allowed.ms";

Review Comment:
   Since we are using the unit suffix I think we can drop the `_MS` suffix here.
   
   ```suggestion
         OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED =
         "ozone.om.snapshot.compaction.dag.max.time.allowed";
   ```
   
   Same for the rest.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -600,13 +638,14 @@ synchronized void processCompactionLogLine(String line) {
       // Read sequence number, and snapshot ID
       LOG.debug("Reading sequence number as snapshot generation, "
           + "and snapshot ID");
-      final String trimmedStr =
-          line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length()).trim();
-      final Scanner input = new Scanner(trimmedStr);
-      // This would the snapshot generation for the nodes to come
-      reconstructionSnapshotGeneration = input.nextLong();
-      // This is the snapshotID assigned to every single CompactionNode to come
-      reconstructionLastSnapshotID = input.nextLine().trim();
+      String[] splits = line.split(" ");
+      assert (splits.length == 5);
+
+      reconstructionSnapshotGeneration = Long.parseLong(splits[1]);
+      reconstructionLastSnapshotID = splits[2];
+      String snapshotDir = splits[3];
+      long createdAt = Long.parseLong(splits[4]);
+      snapshots.add(Pair.of(createdAt, snapshotDir));

Review Comment:
   ```suggestion
         // TODO: TODO: Use global snapshot chain instead when implemented
         snapshots.add(Pair.of(createdAt, snapshotDir));
   ```



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -939,6 +978,114 @@ private void populateCompactionDAG(List<String> 
inputFiles,
 
   }
 
+  /**
+   * This is the task definition which is run periodically by the service
+   * executor at fixed delay.
+   * It looks for snapshots in compaction DAG which are older than the allowed
+   * time to be in compaction DAG and removes them from the DAG.
+   */
+  public void pruneOlderSnapshotsWithCompactionHistory() {
+    String snapshotDir = null;
+    long currentTimeMillis = System.currentTimeMillis();
+
+    while (!snapshots.isEmpty() &&
+        (currentTimeMillis - snapshots.peek().getLeft())
+            > maxAllowedTimeInDag) {
+      snapshotDir = snapshots.poll().getRight();
+    }
+
+    if (snapshotDir != null) {
+      pruneSnapshotFileNodesFromDag(snapshotDir);
+    }
+  }
+
+  /**
+   * Prunes forward and backward DAGs when oldest snapshot with compaction
+   * history gets deleted.
+   */
+  public void pruneSnapshotFileNodesFromDag(String snapshotDir) {
+    Set<String> snapshotSstFiles = readRocksDBLiveFiles(snapshotDir);
+    if (snapshotSstFiles.isEmpty()) {
+      LOG.info("Snapshot '{}' doesn't have any sst file to remove.",
+          snapshotDir);
+      return;
+    }
+
+    Set<CompactionNode> startNodes = new HashSet<>();
+    for (String sstFile: snapshotSstFiles) {

Review Comment:
   ```suggestion
       for (String sstFile : snapshotSstFiles) {
   ```



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -939,6 +978,114 @@ private void populateCompactionDAG(List<String> 
inputFiles,
 
   }
 
+  /**
+   * This is the task definition which is run periodically by the service
+   * executor at fixed delay.
+   * It looks for snapshots in compaction DAG which are older than the allowed
+   * time to be in compaction DAG and removes them from the DAG.
+   */
+  public void pruneOlderSnapshotsWithCompactionHistory() {
+    String snapshotDir = null;
+    long currentTimeMillis = System.currentTimeMillis();
+
+    while (!snapshots.isEmpty() &&
+        (currentTimeMillis - snapshots.peek().getLeft())
+            > maxAllowedTimeInDag) {
+      snapshotDir = snapshots.poll().getRight();
+    }
+
+    if (snapshotDir != null) {
+      pruneSnapshotFileNodesFromDag(snapshotDir);
+    }
+  }
+
+  /**
+   * Prunes forward and backward DAGs when oldest snapshot with compaction
+   * history gets deleted.
+   */
+  public void pruneSnapshotFileNodesFromDag(String snapshotDir) {
+    Set<String> snapshotSstFiles = readRocksDBLiveFiles(snapshotDir);
+    if (snapshotSstFiles.isEmpty()) {
+      LOG.info("Snapshot '{}' doesn't have any sst file to remove.",

Review Comment:
   This case is weird enough that might deserve a `warn` or `error`. Possibly 
this DB checkpoint directory is corrupted or tampered with.
   
   ```suggestion
         LOG.error("Snapshot '{}' DB checkpoint doesn't have any SST file.",
   ```



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -309,13 +350,15 @@ private synchronized void 
appendToCurrentCompactionLog(String content) {
   /**
    * Append a sequence number to the compaction log (roughly) when an Ozone
    * snapshot (RDB checkpoint) is taken.
-   * @param sequenceNum RDB sequence number
    */
   public void appendSequenceNumberToCompactionLog(long sequenceNum,
-      String snapshotID) {
+                                                  String snapshotID,
+                                                  String snapshotDir,
+                                                  long creationTime) {
     final String line = COMPACTION_LOG_SEQNUM_LINE_PREFIX + sequenceNum +
-        " " + snapshotID + "\n";
+        " " + snapshotID + " " + snapshotDir + " " + creationTime + "\n";
     appendToCurrentCompactionLog(line);
+    snapshots.add(Pair.of(creationTime, snapshotDir));

Review Comment:
   ```suggestion
       // TODO: Use global snapshot chain instead when implemented
       snapshots.add(Pair.of(creationTime, snapshotDir));
   ```



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -148,17 +153,36 @@ public class RocksDBCheckpointDiffer {
    */
   private boolean skipGetSSTFileSummary = false;
 
+  /**
+   * A queue which keeps the snapshot in shorted order of their creations time.
+   * It is used by DAG pruning daemon to remove snapshots older than allowed
+   * time in compaction DAG.
+   */
+  private final Queue<Pair<Long, String>> snapshots = new LinkedList<>();

Review Comment:
   This is fine for now. Note when global snapshot chain is utilized we could 
simply use that instead. (Snapshot chain is implemented in #3658 but not in-use 
yet.)



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -174,6 +198,14 @@ public RocksDBCheckpointDiffer(String metadataDir, String 
sstBackupDir,
 
     // Active DB location is used in getSSTFileSummary
     this.activeDBLocationStr = activeDBLocation.toString() + "/";
+
+    this.maxAllowedTimeInDag = maxTimeAllowedForSnapshotInDagInMs;
+    this.executor = Executors.newSingleThreadScheduledExecutor();
+    this.executor.scheduleWithFixedDelay(
+        this::pruneOlderSnapshotsWithCompactionHistory,
+        0,
+        pruneCompactionDagDaemonRunIntervalInMs,
+        TimeUnit.MILLISECONDS);

Review Comment:
   This would work. Though it is expected to use `BackgroundTask` rather than 
using `Executors` directly unless inapplicable. See other tasks that `extends 
BackgroundService` for examples.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -600,13 +638,14 @@ synchronized void processCompactionLogLine(String line) {
       // Read sequence number, and snapshot ID
       LOG.debug("Reading sequence number as snapshot generation, "
           + "and snapshot ID");
-      final String trimmedStr =
-          line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length()).trim();
-      final Scanner input = new Scanner(trimmedStr);
-      // This would the snapshot generation for the nodes to come
-      reconstructionSnapshotGeneration = input.nextLong();
-      // This is the snapshotID assigned to every single CompactionNode to come
-      reconstructionLastSnapshotID = input.nextLine().trim();
+      String[] splits = line.split(" ");
+      assert (splits.length == 5);

Review Comment:
   Hmm looks like it isn't handling the case where `snapshotDir` might have 
space in its path.
   
   On the other hand, I forgot whether `snapshotDir` stores relative path or 
full path. If it is the former, we are in full control of the path so I think 
it is not an issue.
   
   If `snapshotDir` is the full path, we can't be sure if Ozone metadata is 
placed in a directory path without space(s). And this can also become a problem 
when we move Ozone metadata around. Will need to be fixed if so.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -148,17 +153,36 @@ public class RocksDBCheckpointDiffer {
    */
   private boolean skipGetSSTFileSummary = false;
 
+  /**
+   * A queue which keeps the snapshot in shorted order of their creations time.

Review Comment:
   ```suggestion
      * A queue which keeps the snapshots in sorted order by their creation 
time.
   ```



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -939,6 +978,114 @@ private void populateCompactionDAG(List<String> 
inputFiles,
 
   }
 
+  /**
+   * This is the task definition which is run periodically by the service
+   * executor at fixed delay.
+   * It looks for snapshots in compaction DAG which are older than the allowed
+   * time to be in compaction DAG and removes them from the DAG.
+   */
+  public void pruneOlderSnapshotsWithCompactionHistory() {
+    String snapshotDir = null;
+    long currentTimeMillis = System.currentTimeMillis();
+
+    while (!snapshots.isEmpty() &&
+        (currentTimeMillis - snapshots.peek().getLeft())
+            > maxAllowedTimeInDag) {
+      snapshotDir = snapshots.poll().getRight();
+    }
+
+    if (snapshotDir != null) {
+      pruneSnapshotFileNodesFromDag(snapshotDir);
+    }
+  }
+
+  /**
+   * Prunes forward and backward DAGs when oldest snapshot with compaction
+   * history gets deleted.
+   */
+  public void pruneSnapshotFileNodesFromDag(String snapshotDir) {
+    Set<String> snapshotSstFiles = readRocksDBLiveFiles(snapshotDir);
+    if (snapshotSstFiles.isEmpty()) {
+      LOG.info("Snapshot '{}' doesn't have any sst file to remove.",
+          snapshotDir);
+      return;
+    }
+
+    Set<CompactionNode> startNodes = new HashSet<>();
+    for (String sstFile: snapshotSstFiles) {
+      CompactionNode infileNode = compactionNodeMap.get(sstFile);
+      if (infileNode == null) {
+        LOG.error("Compaction node doesn't exist for sstFile: {}.", sstFile);
+        continue;
+      }
+
+      startNodes.add(infileNode);
+    }
+
+    pruneBackwardDag(backwardCompactionDAG, startNodes);
+    Set<String> sstFilesPruned = pruneForwardDag(forwardCompactionDAG,
+        startNodes);
+
+    // Remove SST file nodes from compactionNodeMap too,
+    // since those nodes won't be needed after clean up.
+    sstFilesPruned.forEach(compactionNodeMap::remove);
+
+    LOG.info("Pruned SST nodes from DAG: {}.", sstFilesPruned);
+  }
+
+  /**
+   * Prunes backward DAG's upstream from the level, that needs to be removed.
+   */
+  @VisibleForTesting
+  Set<String> pruneBackwardDag(MutableGraph<CompactionNode> backwardDag,
+                               Set<CompactionNode> startNodes) {
+    Set<String> removedFiles = new HashSet<>();
+    Set<CompactionNode> currentLevel = startNodes;
+
+    while (!currentLevel.isEmpty()) {
+      Set<CompactionNode> nextLevel = new HashSet<>();
+      for (CompactionNode current : currentLevel) {
+        if (!backwardDag.nodes().contains(current)) {
+          continue;

Review Comment:
   This condition wouldn't be entered at all if the graph is indeed a DAG 
right? (no loop)
   
   Is this just a sanity check? Same in `pruneForwardDag`.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -184,6 +216,8 @@ public RocksDBCheckpointDiffer(String metadataDir, String 
sstBackupDir,
     this.skipGetSSTFileSummary = true;
     this.sstBackupDir = null;
     this.activeDBLocationStr = null;
+    this.executor = null;
+    this.maxAllowedTimeInDag = 0;

Review Comment:
   nit
   ```suggestion
       this.maxAllowedTimeInDag = 0L;
   ```



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java:
##########
@@ -369,6 +355,55 @@ void diffAllSnapshots(RocksDBCheckpointDiffer differ) {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(ints = {0, 1, 2, 3, 4, 5})
+  public void testRemoveSnapshotFromDag(int snapshotIndex) throws Exception {
+    String clDirStr = "compaction-log";
+    String metadataDirStr = ".";
+    String sstDirStr = "compaction-sst-backup";
+    List<List<String>> expectedDiff = asList(
+        asList("000024", "000017", "000028", "000026", "000019", "000021"),
+        asList("000024", "000028", "000026", "000019", "000021"),
+        asList("000024", "000028", "000026", "000021"),
+        asList("000024", "000028", "000026"),
+        asList("000028", "000026"),
+        Collections.singletonList("000028"),
+        Collections.emptyList()

Review Comment:
   Is this line (index 6) used anywhere?



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -309,13 +350,15 @@ private synchronized void 
appendToCurrentCompactionLog(String content) {
   /**
    * Append a sequence number to the compaction log (roughly) when an Ozone
    * snapshot (RDB checkpoint) is taken.
-   * @param sequenceNum RDB sequence number
    */
   public void appendSequenceNumberToCompactionLog(long sequenceNum,
-      String snapshotID) {
+                                                  String snapshotID,
+                                                  String snapshotDir,
+                                                  long creationTime) {
     final String line = COMPACTION_LOG_SEQNUM_LINE_PREFIX + sequenceNum +
-        " " + snapshotID + "\n";
+        " " + snapshotID + " " + snapshotDir + " " + creationTime + "\n";

Review Comment:
   What if `snapshotDir` has space in its path? Could it happen?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to