hemantk-12 commented on code in PR #4045:
URL: https://github.com/apache/ozone/pull/4045#discussion_r1054795770


##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -939,6 +980,290 @@ private void populateCompactionDAG(List<String> 
inputFiles,
 
   }
 
+  /**
+   * This is the task definition which is run periodically by the service
+   * executor at fixed delay.
+   * It looks for snapshots in compaction DAG which are older than the allowed
+   * time to be in compaction DAG and removes them from the DAG.
+   */
+  public void pruneOlderSnapshotsWithCompactionHistory() {
+    List<Path> olderSnapshotsLogFilePaths =
+        getOlderSnapshotsCompactionLogFilePaths();
+
+    List<String> lastCompactionSstFiles =
+        getLastCompactionSstFiles(olderSnapshotsLogFilePaths);
+
+    Set<String> sstFileNodesRemoved =
+        pruneSnapshotFileNodesFromDag(lastCompactionSstFiles);
+    removeSstFile(sstFileNodesRemoved);
+    deleteOlderSnapshotsCompactionFiles(olderSnapshotsLogFilePaths);
+  }
+
+  /**
+   * Deletes the SST file from the backup directory if exists.
+   */
+  private void removeSstFile(Set<String> sstFileNodes) {
+    for (String sstFileNode: sstFileNodes) {
+      File file = new File(sstBackupDir + sstFileNode + SST_FILE_EXTENSION);
+      try {
+        Files.deleteIfExists(file.toPath());
+      } catch (IOException exception) {
+        LOG.warn("Failed to delete SST file: " + sstFileNode, exception);
+      }
+    }
+  }
+
+  /**
+   * Returns the list of compaction log files which are older than allowed
+   * max time in the compaction DAG.
+   */
+  private List<Path> getOlderSnapshotsCompactionLogFilePaths() {
+    List<Path> olderSnapshotLogPaths = new ArrayList<>();
+
+    long compactionLogPruneStartTime = System.currentTimeMillis();
+
+    List<Path> compactionFiles =
+        listCompactionLogFileFromCompactionLogDirectory();
+
+    for (Path compactionLogPath : compactionFiles) {
+      SnapshotLogInfo snapshotLogInfo =
+          getSnapshotInfoFromLog(compactionLogPath);
+
+      if (snapshotLogInfo == null) {
+        continue;
+      }
+
+      if (maxAllowedTimeInDag >
+          compactionLogPruneStartTime - snapshotLogInfo.snapshotCreatedAt) {
+        break;
+      }
+
+      olderSnapshotLogPaths.add(compactionLogPath);
+    }
+
+    return olderSnapshotLogPaths;
+  }
+
+  /**
+   * Returns the list of compaction log file path from compaction log 
directory.
+   */
+  private List<Path> listCompactionLogFileFromCompactionLogDirectory() {
+    try (Stream<Path> pathStream = Files.list(Paths.get(compactionLogDir))
+        .filter(e -> e.toString().toLowerCase()
+            .endsWith(COMPACTION_LOG_FILE_NAME_SUFFIX))
+        .sorted()) {
+      return pathStream.collect(Collectors.toList());
+    } catch (IOException e) {
+      throw new RuntimeException("Error listing compaction log dir " +
+          compactionLogDir, e);
+    }
+  }
+
+  public void deleteOlderSnapshotsCompactionFiles(
+      List<Path> olderSnapshotsLogFilePaths) {
+
+    for (int i = 0; i < olderSnapshotsLogFilePaths.size(); i++) {
+      Path olderSnapshotsLogFilePath = olderSnapshotsLogFilePaths.get(i);
+      try {
+        Files.deleteIfExists(olderSnapshotsLogFilePath);
+      } catch (IOException exception) {
+        LOG.error("Failed to deleted SST file: {}", olderSnapshotsLogFilePath,
+            exception);
+      }
+    }
+  }
+
+  /**
+   * Prunes forward and backward DAGs when oldest snapshot with compaction
+   * history gets deleted.
+   */
+  public Set<String > pruneSnapshotFileNodesFromDag(List<String> sstFileNodes) 
{
+    Set<CompactionNode> startNodes = new HashSet<>();
+    for (String sstFileNode : sstFileNodes) {
+      CompactionNode infileNode = compactionNodeMap.get(sstFileNode);
+      if (infileNode == null) {
+        LOG.warn("Compaction node doesn't exist for sstFile: {}.", 
sstFileNode);
+        continue;
+      }
+
+      startNodes.add(infileNode);
+    }
+
+    pruneBackwardDag(backwardCompactionDAG, startNodes);
+    Set<String> sstFilesPruned = pruneForwardDag(forwardCompactionDAG,
+        startNodes);
+
+    // Remove SST file nodes from compactionNodeMap too,
+    // since those nodes won't be needed after clean up.
+    sstFilesPruned.forEach(compactionNodeMap::remove);
+
+    return sstFilesPruned;
+  }
+
+  /**
+   * Prunes backward DAG's upstream from the level, that needs to be removed.
+   */
+  @VisibleForTesting
+  Set<String> pruneBackwardDag(MutableGraph<CompactionNode> backwardDag,
+                               Set<CompactionNode> startNodes) {
+    Set<String> removedFiles = new HashSet<>();
+    Set<CompactionNode> currentLevel = startNodes;
+
+    while (!currentLevel.isEmpty()) {
+      Set<CompactionNode> nextLevel = new HashSet<>();
+      for (CompactionNode current : currentLevel) {
+        if (!backwardDag.nodes().contains(current)) {
+          continue;
+        }
+
+        nextLevel.addAll(backwardDag.predecessors(current));
+        backwardDag.removeNode(current);
+        removedFiles.add(current.getFileName());
+      }
+      currentLevel = nextLevel;
+    }
+
+    return removedFiles;
+  }
+
+  /**
+   * Prunes forward DAG's downstream from the level that needs to be removed.
+   */
+  @VisibleForTesting
+  Set<String> pruneForwardDag(MutableGraph<CompactionNode> forwardDag,
+                              Set<CompactionNode> startNodes) {
+    Set<String> removedFiles = new HashSet<>();
+    Set<CompactionNode> currentLevel = new HashSet<>(startNodes);
+
+    while (!currentLevel.isEmpty()) {
+      Set<CompactionNode> nextLevel = new HashSet<>();
+      for (CompactionNode current : currentLevel) {
+        if (!forwardDag.nodes().contains(current)) {
+          continue;
+        }
+
+        nextLevel.addAll(forwardDag.successors(current));
+        forwardDag.removeNode(current);
+        removedFiles.add(current.getFileName());
+      }
+
+      currentLevel = nextLevel;
+    }
+
+    return removedFiles;
+  }
+
+  private SnapshotLogInfo getSnapshotInfoFromLog(Path compactionLogFile) {
+    AtomicReference<SnapshotLogInfo> snapshotLogInfo =
+        new AtomicReference<>();
+    try (Stream<String> logStream = Files.lines(compactionLogFile, UTF_8)) {
+      logStream.forEach(logLine -> {
+        if (!logLine.startsWith(COMPACTION_LOG_SEQ_NUM_LINE_PREFIX)) {
+          return;
+        }
+
+        snapshotLogInfo.set(getSnapshotLogInfo(logLine));
+      });
+    } catch (IOException exception) {
+      throw new RuntimeException("Failed to read compaction log file: " +
+          compactionLogFile, exception);
+    }
+
+    return snapshotLogInfo.get();
+  }
+
+  /**
+   * Converts a snapshot compaction log line to SnapshotLogInfo.
+   */
+  private SnapshotLogInfo getSnapshotLogInfo(String line) {
+    String[] splits = line.split(" ");
+    assert (splits.length == 4);
+
+    return new SnapshotLogInfo(Long.parseLong(splits[1]),
+        splits[2],
+        Long.parseLong(splits[3]));
+  }
+
+  /**
+   * Returns the list of SST files got compacted in the last compaction from
+   * the provided list of compaction log files.
+   * We can't simply use last file from the list because it is possible that
+   * no compaction happened between the last snapshot and previous to that.
+   * Hence, we go over the list in reverse order and return the SST files from
+   * first the compaction happened in the reverse list.
+   * If no compaction happen at all, it returns empty list.
+   */
+  @VisibleForTesting
+  List<String> getLastCompactionSstFiles(
+      List<Path> compactionLogFiles
+  ) {
+
+    if (compactionLogFiles.isEmpty()) {
+      return Collections.emptyList();
+    }
+    compactionLogFiles = new ArrayList<>(compactionLogFiles);
+    Collections.reverse(compactionLogFiles);
+
+    for (Path compactionLogFile: compactionLogFiles) {
+      List<String> sstFiles = getLastCompactionSstFiles(compactionLogFile);
+      if (!sstFiles.isEmpty()) {
+        return  sstFiles;
+      }
+    }
+
+    return Collections.emptyList();
+  }
+
+  private List<String>  getLastCompactionSstFiles(Path compactionLogFile) {
+
+    AtomicReference<String> sstFiles = new AtomicReference<>();
+
+    try (Stream<String> logStream = Files.lines(compactionLogFile, UTF_8)) {
+      logStream.forEach(logLine -> {
+        if (!logLine.startsWith(COMPACTION_LOG_ENTRY_LINE_PREFIX)) {
+          return;
+        }
+        sstFiles.set(logLine);
+      });
+    } catch (IOException exception) {
+      throw new RuntimeException("Failed to read file: " + compactionLogFile,
+          exception);
+    }
+
+    String lastCompactionLogEntry = sstFiles.get();
+
+    if (StringUtils.isEmpty(lastCompactionLogEntry)) {
+      return Collections.emptyList();
+    }
+
+    // Trim the beginning
+    lastCompactionLogEntry = lastCompactionLogEntry
+        .substring(COMPACTION_LOG_ENTRY_LINE_PREFIX.length());
+
+    String[] io = lastCompactionLogEntry
+        .split(COMPACTION_LOG_ENTRY_INPUT_OUTPUT_FILES_DELIMITER);
+
+    assert (io.length == 2);

Review Comment:
   I was not aware of it. I wrote test and it was failing and thought it should 
work. I'll change it to Guava's `Preconditions.check`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to