hemantk-12 commented on code in PR #4045:
URL: https://github.com/apache/ozone/pull/4045#discussion_r1049039941


##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -309,13 +350,15 @@ private synchronized void 
appendToCurrentCompactionLog(String content) {
   /**
    * Append a sequence number to the compaction log (roughly) when an Ozone
    * snapshot (RDB checkpoint) is taken.
-   * @param sequenceNum RDB sequence number
    */
   public void appendSequenceNumberToCompactionLog(long sequenceNum,
-      String snapshotID) {
+                                                  String snapshotID,
+                                                  String snapshotDir,
+                                                  long creationTime) {
     final String line = COMPACTION_LOG_SEQNUM_LINE_PREFIX + sequenceNum +
-        " " + snapshotID + "\n";
+        " " + snapshotID + " " + snapshotDir + " " + creationTime + "\n";
     appendToCurrentCompactionLog(line);
+    snapshots.add(Pair.of(creationTime, snapshotDir));

Review Comment:
   Once we start using global snapshot chain, `snapshots` will be removed and 
this too. No need to add TODO.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -600,13 +638,14 @@ synchronized void processCompactionLogLine(String line) {
       // Read sequence number, and snapshot ID
       LOG.debug("Reading sequence number as snapshot generation, "
           + "and snapshot ID");
-      final String trimmedStr =
-          line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length()).trim();
-      final Scanner input = new Scanner(trimmedStr);
-      // This would the snapshot generation for the nodes to come
-      reconstructionSnapshotGeneration = input.nextLong();
-      // This is the snapshotID assigned to every single CompactionNode to come
-      reconstructionLastSnapshotID = input.nextLine().trim();
+      String[] splits = line.split(" ");
+      assert (splits.length == 5);

Review Comment:
   Thanks for the debugging. Fixed it.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -174,6 +198,14 @@ public RocksDBCheckpointDiffer(String metadataDir, String 
sstBackupDir,
 
     // Active DB location is used in getSSTFileSummary
     this.activeDBLocationStr = activeDBLocation.toString() + "/";
+
+    this.maxAllowedTimeInDag = maxTimeAllowedForSnapshotInDagInMs;
+    this.executor = Executors.newSingleThreadScheduledExecutor();
+    this.executor.scheduleWithFixedDelay(
+        this::pruneOlderSnapshotsWithCompactionHistory,
+        0,
+        pruneCompactionDagDaemonRunIntervalInMs,
+        TimeUnit.MILLISECONDS);

Review Comment:
   This is fine for now, we may create `BackgroundTask` and move all the 
compaction pruning tasks (Dag pruning, compaction log pruning and SST file 
pruning) there.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -148,17 +153,36 @@ public class RocksDBCheckpointDiffer {
    */
   private boolean skipGetSSTFileSummary = false;
 
+  /**
+   * A queue which keeps the snapshot in shorted order of their creations time.
+   * It is used by DAG pruning daemon to remove snapshots older than allowed
+   * time in compaction DAG.
+   */
+  private final Queue<Pair<Long, String>> snapshots = new LinkedList<>();

Review Comment:
   As discussed offline, it is fine to keep it as it is because this queue is 
limited queue based on time. We can use global snapshot chain later because it 
is sorted by time.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -309,13 +350,15 @@ private synchronized void 
appendToCurrentCompactionLog(String content) {
   /**
    * Append a sequence number to the compaction log (roughly) when an Ozone
    * snapshot (RDB checkpoint) is taken.
-   * @param sequenceNum RDB sequence number
    */
   public void appendSequenceNumberToCompactionLog(long sequenceNum,
-      String snapshotID) {
+                                                  String snapshotID,
+                                                  String snapshotDir,
+                                                  long creationTime) {
     final String line = COMPACTION_LOG_SEQNUM_LINE_PREFIX + sequenceNum +
-        " " + snapshotID + "\n";
+        " " + snapshotID + " " + snapshotDir + " " + creationTime + "\n";

Review Comment:
   It is a relative path so should not happen.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -600,13 +638,14 @@ synchronized void processCompactionLogLine(String line) {
       // Read sequence number, and snapshot ID
       LOG.debug("Reading sequence number as snapshot generation, "
           + "and snapshot ID");
-      final String trimmedStr =
-          line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length()).trim();
-      final Scanner input = new Scanner(trimmedStr);
-      // This would the snapshot generation for the nodes to come
-      reconstructionSnapshotGeneration = input.nextLong();
-      // This is the snapshotID assigned to every single CompactionNode to come
-      reconstructionLastSnapshotID = input.nextLine().trim();
+      String[] splits = line.split(" ");
+      assert (splits.length == 5);
+
+      reconstructionSnapshotGeneration = Long.parseLong(splits[1]);
+      reconstructionLastSnapshotID = splits[2];
+      String snapshotDir = splits[3];
+      long createdAt = Long.parseLong(splits[4]);
+      snapshots.add(Pair.of(createdAt, snapshotDir));

Review Comment:
   Same as previous. 



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java:
##########
@@ -151,7 +151,9 @@ public static DBCheckpoint createOmSnapshotCheckpoint(
     // Write snapshot generation (latest sequence number) to compaction log.
     // This will be used for DAG reconstruction as snapshotGeneration.
     dbCpDiffer.appendSequenceNumberToCompactionLog(dbLatestSequenceNumber,

Review Comment:
   +1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to