smengcl commented on code in PR #4045:
URL: https://github.com/apache/ozone/pull/4045#discussion_r1054736685
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -939,6 +980,290 @@ private void populateCompactionDAG(List<String>
inputFiles,
}
+ /**
+ * This is the task definition which is run periodically by the service
+ * executor at fixed delay.
+ * It looks for snapshots in compaction DAG which are older than the allowed
+ * time to be in compaction DAG and removes them from the DAG.
+ */
+ public void pruneOlderSnapshotsWithCompactionHistory() {
+ List<Path> olderSnapshotsLogFilePaths =
+ getOlderSnapshotsCompactionLogFilePaths();
+
+ List<String> lastCompactionSstFiles =
+ getLastCompactionSstFiles(olderSnapshotsLogFilePaths);
+
+ Set<String> sstFileNodesRemoved =
+ pruneSnapshotFileNodesFromDag(lastCompactionSstFiles);
+ removeSstFile(sstFileNodesRemoved);
+ deleteOlderSnapshotsCompactionFiles(olderSnapshotsLogFilePaths);
+ }
+
+ /**
+ * Deletes the SST file from the backup directory if exists.
+ */
+ private void removeSstFile(Set<String> sstFileNodes) {
+ for (String sstFileNode: sstFileNodes) {
+ File file = new File(sstBackupDir + sstFileNode + SST_FILE_EXTENSION);
+ try {
+ Files.deleteIfExists(file.toPath());
Review Comment:
Note: `deleteIfExists` is fine. But in which case would SST not be found
when it is being deleted here. Is it worth throwing a warning at least if the
file doesn't exist?
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -939,6 +980,290 @@ private void populateCompactionDAG(List<String>
inputFiles,
}
+ /**
+ * This is the task definition which is run periodically by the service
+ * executor at fixed delay.
+ * It looks for snapshots in compaction DAG which are older than the allowed
+ * time to be in compaction DAG and removes them from the DAG.
+ */
+ public void pruneOlderSnapshotsWithCompactionHistory() {
+ List<Path> olderSnapshotsLogFilePaths =
+ getOlderSnapshotsCompactionLogFilePaths();
+
+ List<String> lastCompactionSstFiles =
+ getLastCompactionSstFiles(olderSnapshotsLogFilePaths);
+
+ Set<String> sstFileNodesRemoved =
+ pruneSnapshotFileNodesFromDag(lastCompactionSstFiles);
+ removeSstFile(sstFileNodesRemoved);
+ deleteOlderSnapshotsCompactionFiles(olderSnapshotsLogFilePaths);
+ }
+
+ /**
+ * Deletes the SST file from the backup directory if exists.
+ */
+ private void removeSstFile(Set<String> sstFileNodes) {
+ for (String sstFileNode: sstFileNodes) {
+ File file = new File(sstBackupDir + sstFileNode + SST_FILE_EXTENSION);
+ try {
+ Files.deleteIfExists(file.toPath());
+ } catch (IOException exception) {
+ LOG.warn("Failed to delete SST file: " + sstFileNode, exception);
+ }
+ }
+ }
+
+ /**
+ * Returns the list of compaction log files which are older than allowed
+ * max time in the compaction DAG.
+ */
+ private List<Path> getOlderSnapshotsCompactionLogFilePaths() {
+ List<Path> olderSnapshotLogPaths = new ArrayList<>();
+
+ long compactionLogPruneStartTime = System.currentTimeMillis();
+
+ List<Path> compactionFiles =
+ listCompactionLogFileFromCompactionLogDirectory();
+
+ for (Path compactionLogPath : compactionFiles) {
+ SnapshotLogInfo snapshotLogInfo =
+ getSnapshotInfoFromLog(compactionLogPath);
+
+ if (snapshotLogInfo == null) {
Review Comment:
In this case, the log would still need to be included for the traversal
later in `getLastCompactionSstFiles(Path)` right?
Otherwise, what happens if the first line in the next compaction log file is
an "S" entry? (Can this happen in the first place?) Then when looking for the
last "C" line, it can fall in the log file that is being ignored in the current
logic. Do you think this could become a problem?
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -939,6 +980,290 @@ private void populateCompactionDAG(List<String>
inputFiles,
}
+ /**
+ * This is the task definition which is run periodically by the service
+ * executor at fixed delay.
+ * It looks for snapshots in compaction DAG which are older than the allowed
+ * time to be in compaction DAG and removes them from the DAG.
+ */
+ public void pruneOlderSnapshotsWithCompactionHistory() {
+ List<Path> olderSnapshotsLogFilePaths =
+ getOlderSnapshotsCompactionLogFilePaths();
+
+ List<String> lastCompactionSstFiles =
+ getLastCompactionSstFiles(olderSnapshotsLogFilePaths);
+
+ Set<String> sstFileNodesRemoved =
+ pruneSnapshotFileNodesFromDag(lastCompactionSstFiles);
+ removeSstFile(sstFileNodesRemoved);
+ deleteOlderSnapshotsCompactionFiles(olderSnapshotsLogFilePaths);
+ }
+
+ /**
+ * Deletes the SST file from the backup directory if exists.
+ */
+ private void removeSstFile(Set<String> sstFileNodes) {
+ for (String sstFileNode: sstFileNodes) {
+ File file = new File(sstBackupDir + sstFileNode + SST_FILE_EXTENSION);
+ try {
+ Files.deleteIfExists(file.toPath());
+ } catch (IOException exception) {
+ LOG.warn("Failed to delete SST file: " + sstFileNode, exception);
+ }
+ }
+ }
+
+ /**
+ * Returns the list of compaction log files which are older than allowed
+ * max time in the compaction DAG.
+ */
+ private List<Path> getOlderSnapshotsCompactionLogFilePaths() {
+ List<Path> olderSnapshotLogPaths = new ArrayList<>();
+
+ long compactionLogPruneStartTime = System.currentTimeMillis();
+
+ List<Path> compactionFiles =
+ listCompactionLogFileFromCompactionLogDirectory();
+
+ for (Path compactionLogPath : compactionFiles) {
+ SnapshotLogInfo snapshotLogInfo =
+ getSnapshotInfoFromLog(compactionLogPath);
+
+ if (snapshotLogInfo == null) {
+ continue;
+ }
+
+ if (maxAllowedTimeInDag >
+ compactionLogPruneStartTime - snapshotLogInfo.snapshotCreatedAt) {
+ break;
+ }
+
+ olderSnapshotLogPaths.add(compactionLogPath);
+ }
+
+ return olderSnapshotLogPaths;
+ }
+
+ /**
+ * Returns the list of compaction log file path from compaction log
directory.
+ */
+ private List<Path> listCompactionLogFileFromCompactionLogDirectory() {
+ try (Stream<Path> pathStream = Files.list(Paths.get(compactionLogDir))
+ .filter(e -> e.toString().toLowerCase()
+ .endsWith(COMPACTION_LOG_FILE_NAME_SUFFIX))
+ .sorted()) {
+ return pathStream.collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new RuntimeException("Error listing compaction log dir " +
+ compactionLogDir, e);
+ }
+ }
+
+ public void deleteOlderSnapshotsCompactionFiles(
+ List<Path> olderSnapshotsLogFilePaths) {
+
+ for (int i = 0; i < olderSnapshotsLogFilePaths.size(); i++) {
+ Path olderSnapshotsLogFilePath = olderSnapshotsLogFilePaths.get(i);
+ try {
+ Files.deleteIfExists(olderSnapshotsLogFilePath);
+ } catch (IOException exception) {
+ LOG.error("Failed to deleted SST file: {}", olderSnapshotsLogFilePath,
+ exception);
+ }
+ }
+ }
+
+ /**
+ * Prunes forward and backward DAGs when oldest snapshot with compaction
+ * history gets deleted.
+ */
+ public Set<String > pruneSnapshotFileNodesFromDag(List<String> sstFileNodes)
{
+ Set<CompactionNode> startNodes = new HashSet<>();
+ for (String sstFileNode : sstFileNodes) {
+ CompactionNode infileNode = compactionNodeMap.get(sstFileNode);
+ if (infileNode == null) {
+ LOG.warn("Compaction node doesn't exist for sstFile: {}.",
sstFileNode);
+ continue;
+ }
+
+ startNodes.add(infileNode);
+ }
+
+ pruneBackwardDag(backwardCompactionDAG, startNodes);
+ Set<String> sstFilesPruned = pruneForwardDag(forwardCompactionDAG,
+ startNodes);
+
+ // Remove SST file nodes from compactionNodeMap too,
+ // since those nodes won't be needed after clean up.
+ sstFilesPruned.forEach(compactionNodeMap::remove);
+
+ return sstFilesPruned;
+ }
+
+ /**
+ * Prunes backward DAG's upstream from the level, that needs to be removed.
+ */
+ @VisibleForTesting
+ Set<String> pruneBackwardDag(MutableGraph<CompactionNode> backwardDag,
+ Set<CompactionNode> startNodes) {
+ Set<String> removedFiles = new HashSet<>();
+ Set<CompactionNode> currentLevel = startNodes;
+
+ while (!currentLevel.isEmpty()) {
+ Set<CompactionNode> nextLevel = new HashSet<>();
+ for (CompactionNode current : currentLevel) {
+ if (!backwardDag.nodes().contains(current)) {
+ continue;
+ }
+
+ nextLevel.addAll(backwardDag.predecessors(current));
+ backwardDag.removeNode(current);
+ removedFiles.add(current.getFileName());
+ }
+ currentLevel = nextLevel;
+ }
+
+ return removedFiles;
+ }
+
+ /**
+ * Prunes forward DAG's downstream from the level that needs to be removed.
+ */
+ @VisibleForTesting
+ Set<String> pruneForwardDag(MutableGraph<CompactionNode> forwardDag,
+ Set<CompactionNode> startNodes) {
+ Set<String> removedFiles = new HashSet<>();
+ Set<CompactionNode> currentLevel = new HashSet<>(startNodes);
+
+ while (!currentLevel.isEmpty()) {
+ Set<CompactionNode> nextLevel = new HashSet<>();
+ for (CompactionNode current : currentLevel) {
+ if (!forwardDag.nodes().contains(current)) {
+ continue;
+ }
+
+ nextLevel.addAll(forwardDag.successors(current));
+ forwardDag.removeNode(current);
+ removedFiles.add(current.getFileName());
+ }
+
+ currentLevel = nextLevel;
+ }
+
+ return removedFiles;
+ }
+
+ private SnapshotLogInfo getSnapshotInfoFromLog(Path compactionLogFile) {
+ AtomicReference<SnapshotLogInfo> snapshotLogInfo =
+ new AtomicReference<>();
+ try (Stream<String> logStream = Files.lines(compactionLogFile, UTF_8)) {
+ logStream.forEach(logLine -> {
+ if (!logLine.startsWith(COMPACTION_LOG_SEQ_NUM_LINE_PREFIX)) {
+ return;
+ }
+
+ snapshotLogInfo.set(getSnapshotLogInfo(logLine));
+ });
+ } catch (IOException exception) {
+ throw new RuntimeException("Failed to read compaction log file: " +
+ compactionLogFile, exception);
+ }
+
+ return snapshotLogInfo.get();
+ }
+
+ /**
+ * Converts a snapshot compaction log line to SnapshotLogInfo.
+ */
+ private SnapshotLogInfo getSnapshotLogInfo(String line) {
+ String[] splits = line.split(" ");
+ assert (splits.length == 4);
+
+ return new SnapshotLogInfo(Long.parseLong(splits[1]),
+ splits[2],
+ Long.parseLong(splits[3]));
+ }
+
+ /**
+ * Returns the list of SST files got compacted in the last compaction from
+ * the provided list of compaction log files.
+ * We can't simply use last file from the list because it is possible that
+ * no compaction happened between the last snapshot and previous to that.
+ * Hence, we go over the list in reverse order and return the SST files from
+ * first the compaction happened in the reverse list.
+ * If no compaction happen at all, it returns empty list.
+ */
+ @VisibleForTesting
+ List<String> getLastCompactionSstFiles(
+ List<Path> compactionLogFiles
+ ) {
+
+ if (compactionLogFiles.isEmpty()) {
+ return Collections.emptyList();
+ }
+ compactionLogFiles = new ArrayList<>(compactionLogFiles);
+ Collections.reverse(compactionLogFiles);
+
+ for (Path compactionLogFile: compactionLogFiles) {
+ List<String> sstFiles = getLastCompactionSstFiles(compactionLogFile);
+ if (!sstFiles.isEmpty()) {
+ return sstFiles;
+ }
+ }
+
+ return Collections.emptyList();
+ }
+
+ private List<String> getLastCompactionSstFiles(Path compactionLogFile) {
+
+ AtomicReference<String> sstFiles = new AtomicReference<>();
+
+ try (Stream<String> logStream = Files.lines(compactionLogFile, UTF_8)) {
+ logStream.forEach(logLine -> {
+ if (!logLine.startsWith(COMPACTION_LOG_ENTRY_LINE_PREFIX)) {
+ return;
+ }
+ sstFiles.set(logLine);
+ });
Review Comment:
This is a creative way to locate the last "C " compaction line. I wonder how
costly atomic set is tho.
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -939,6 +980,290 @@ private void populateCompactionDAG(List<String>
inputFiles,
}
+ /**
+ * This is the task definition which is run periodically by the service
+ * executor at fixed delay.
+ * It looks for snapshots in compaction DAG which are older than the allowed
+ * time to be in compaction DAG and removes them from the DAG.
+ */
+ public void pruneOlderSnapshotsWithCompactionHistory() {
+ List<Path> olderSnapshotsLogFilePaths =
+ getOlderSnapshotsCompactionLogFilePaths();
+
+ List<String> lastCompactionSstFiles =
+ getLastCompactionSstFiles(olderSnapshotsLogFilePaths);
+
+ Set<String> sstFileNodesRemoved =
+ pruneSnapshotFileNodesFromDag(lastCompactionSstFiles);
+ removeSstFile(sstFileNodesRemoved);
+ deleteOlderSnapshotsCompactionFiles(olderSnapshotsLogFilePaths);
+ }
+
+ /**
+ * Deletes the SST file from the backup directory if exists.
+ */
+ private void removeSstFile(Set<String> sstFileNodes) {
+ for (String sstFileNode: sstFileNodes) {
+ File file = new File(sstBackupDir + sstFileNode + SST_FILE_EXTENSION);
+ try {
+ Files.deleteIfExists(file.toPath());
+ } catch (IOException exception) {
+ LOG.warn("Failed to delete SST file: " + sstFileNode, exception);
+ }
+ }
+ }
+
+ /**
+ * Returns the list of compaction log files which are older than allowed
+ * max time in the compaction DAG.
+ */
+ private List<Path> getOlderSnapshotsCompactionLogFilePaths() {
+ List<Path> olderSnapshotLogPaths = new ArrayList<>();
+
+ long compactionLogPruneStartTime = System.currentTimeMillis();
+
+ List<Path> compactionFiles =
+ listCompactionLogFileFromCompactionLogDirectory();
+
+ for (Path compactionLogPath : compactionFiles) {
+ SnapshotLogInfo snapshotLogInfo =
+ getSnapshotInfoFromLog(compactionLogPath);
+
+ if (snapshotLogInfo == null) {
+ continue;
+ }
+
+ if (maxAllowedTimeInDag >
+ compactionLogPruneStartTime - snapshotLogInfo.snapshotCreatedAt) {
+ break;
+ }
+
+ olderSnapshotLogPaths.add(compactionLogPath);
+ }
+
+ return olderSnapshotLogPaths;
+ }
+
+ /**
+ * Returns the list of compaction log file path from compaction log
directory.
+ */
+ private List<Path> listCompactionLogFileFromCompactionLogDirectory() {
+ try (Stream<Path> pathStream = Files.list(Paths.get(compactionLogDir))
+ .filter(e -> e.toString().toLowerCase()
+ .endsWith(COMPACTION_LOG_FILE_NAME_SUFFIX))
+ .sorted()) {
+ return pathStream.collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new RuntimeException("Error listing compaction log dir " +
+ compactionLogDir, e);
+ }
+ }
+
+ public void deleteOlderSnapshotsCompactionFiles(
+ List<Path> olderSnapshotsLogFilePaths) {
+
+ for (int i = 0; i < olderSnapshotsLogFilePaths.size(); i++) {
+ Path olderSnapshotsLogFilePath = olderSnapshotsLogFilePaths.get(i);
+ try {
+ Files.deleteIfExists(olderSnapshotsLogFilePath);
+ } catch (IOException exception) {
+ LOG.error("Failed to deleted SST file: {}", olderSnapshotsLogFilePath,
+ exception);
+ }
+ }
+ }
+
+ /**
+ * Prunes forward and backward DAGs when oldest snapshot with compaction
+ * history gets deleted.
+ */
+ public Set<String > pruneSnapshotFileNodesFromDag(List<String> sstFileNodes)
{
+ Set<CompactionNode> startNodes = new HashSet<>();
+ for (String sstFileNode : sstFileNodes) {
+ CompactionNode infileNode = compactionNodeMap.get(sstFileNode);
+ if (infileNode == null) {
+ LOG.warn("Compaction node doesn't exist for sstFile: {}.",
sstFileNode);
+ continue;
+ }
+
+ startNodes.add(infileNode);
+ }
+
+ pruneBackwardDag(backwardCompactionDAG, startNodes);
+ Set<String> sstFilesPruned = pruneForwardDag(forwardCompactionDAG,
+ startNodes);
+
+ // Remove SST file nodes from compactionNodeMap too,
+ // since those nodes won't be needed after clean up.
+ sstFilesPruned.forEach(compactionNodeMap::remove);
+
+ return sstFilesPruned;
+ }
+
+ /**
+ * Prunes backward DAG's upstream from the level, that needs to be removed.
+ */
+ @VisibleForTesting
+ Set<String> pruneBackwardDag(MutableGraph<CompactionNode> backwardDag,
+ Set<CompactionNode> startNodes) {
+ Set<String> removedFiles = new HashSet<>();
+ Set<CompactionNode> currentLevel = startNodes;
+
+ while (!currentLevel.isEmpty()) {
+ Set<CompactionNode> nextLevel = new HashSet<>();
+ for (CompactionNode current : currentLevel) {
+ if (!backwardDag.nodes().contains(current)) {
+ continue;
+ }
+
+ nextLevel.addAll(backwardDag.predecessors(current));
+ backwardDag.removeNode(current);
+ removedFiles.add(current.getFileName());
+ }
+ currentLevel = nextLevel;
+ }
+
+ return removedFiles;
+ }
+
+ /**
+ * Prunes forward DAG's downstream from the level that needs to be removed.
+ */
+ @VisibleForTesting
+ Set<String> pruneForwardDag(MutableGraph<CompactionNode> forwardDag,
+ Set<CompactionNode> startNodes) {
+ Set<String> removedFiles = new HashSet<>();
+ Set<CompactionNode> currentLevel = new HashSet<>(startNodes);
+
+ while (!currentLevel.isEmpty()) {
+ Set<CompactionNode> nextLevel = new HashSet<>();
+ for (CompactionNode current : currentLevel) {
+ if (!forwardDag.nodes().contains(current)) {
+ continue;
+ }
+
+ nextLevel.addAll(forwardDag.successors(current));
+ forwardDag.removeNode(current);
+ removedFiles.add(current.getFileName());
+ }
+
+ currentLevel = nextLevel;
+ }
+
+ return removedFiles;
+ }
+
+ private SnapshotLogInfo getSnapshotInfoFromLog(Path compactionLogFile) {
+ AtomicReference<SnapshotLogInfo> snapshotLogInfo =
+ new AtomicReference<>();
+ try (Stream<String> logStream = Files.lines(compactionLogFile, UTF_8)) {
+ logStream.forEach(logLine -> {
+ if (!logLine.startsWith(COMPACTION_LOG_SEQ_NUM_LINE_PREFIX)) {
+ return;
+ }
+
+ snapshotLogInfo.set(getSnapshotLogInfo(logLine));
+ });
+ } catch (IOException exception) {
+ throw new RuntimeException("Failed to read compaction log file: " +
+ compactionLogFile, exception);
+ }
+
+ return snapshotLogInfo.get();
+ }
+
+ /**
+ * Converts a snapshot compaction log line to SnapshotLogInfo.
+ */
+ private SnapshotLogInfo getSnapshotLogInfo(String line) {
+ String[] splits = line.split(" ");
+ assert (splits.length == 4);
+
+ return new SnapshotLogInfo(Long.parseLong(splits[1]),
+ splits[2],
+ Long.parseLong(splits[3]));
+ }
+
+ /**
+ * Returns the list of SST files got compacted in the last compaction from
+ * the provided list of compaction log files.
+ * We can't simply use last file from the list because it is possible that
+ * no compaction happened between the last snapshot and previous to that.
+ * Hence, we go over the list in reverse order and return the SST files from
+ * first the compaction happened in the reverse list.
+ * If no compaction happen at all, it returns empty list.
+ */
+ @VisibleForTesting
+ List<String> getLastCompactionSstFiles(
+ List<Path> compactionLogFiles
+ ) {
+
+ if (compactionLogFiles.isEmpty()) {
+ return Collections.emptyList();
+ }
+ compactionLogFiles = new ArrayList<>(compactionLogFiles);
+ Collections.reverse(compactionLogFiles);
+
+ for (Path compactionLogFile: compactionLogFiles) {
+ List<String> sstFiles = getLastCompactionSstFiles(compactionLogFile);
+ if (!sstFiles.isEmpty()) {
+ return sstFiles;
+ }
+ }
+
+ return Collections.emptyList();
+ }
+
+ private List<String> getLastCompactionSstFiles(Path compactionLogFile) {
+
+ AtomicReference<String> sstFiles = new AtomicReference<>();
+
+ try (Stream<String> logStream = Files.lines(compactionLogFile, UTF_8)) {
+ logStream.forEach(logLine -> {
+ if (!logLine.startsWith(COMPACTION_LOG_ENTRY_LINE_PREFIX)) {
+ return;
+ }
+ sstFiles.set(logLine);
+ });
+ } catch (IOException exception) {
+ throw new RuntimeException("Failed to read file: " + compactionLogFile,
+ exception);
+ }
+
+ String lastCompactionLogEntry = sstFiles.get();
+
+ if (StringUtils.isEmpty(lastCompactionLogEntry)) {
+ return Collections.emptyList();
+ }
+
+ // Trim the beginning
+ lastCompactionLogEntry = lastCompactionLogEntry
+ .substring(COMPACTION_LOG_ENTRY_LINE_PREFIX.length());
+
+ String[] io = lastCompactionLogEntry
+ .split(COMPACTION_LOG_ENTRY_INPUT_OUTPUT_FILES_DELIMITER);
+
+ assert (io.length == 2);
Review Comment:
ICYDK `assert`s are ignored by JVM unless explicitly enabled with arg `-ea`.
If that is not desired, use Guava `Preconditions.checkArgument` instead.
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -939,6 +980,290 @@ private void populateCompactionDAG(List<String>
inputFiles,
}
+ /**
+ * This is the task definition which is run periodically by the service
+ * executor at fixed delay.
+ * It looks for snapshots in compaction DAG which are older than the allowed
+ * time to be in compaction DAG and removes them from the DAG.
+ */
+ public void pruneOlderSnapshotsWithCompactionHistory() {
+ List<Path> olderSnapshotsLogFilePaths =
+ getOlderSnapshotsCompactionLogFilePaths();
+
+ List<String> lastCompactionSstFiles =
+ getLastCompactionSstFiles(olderSnapshotsLogFilePaths);
+
+ Set<String> sstFileNodesRemoved =
+ pruneSnapshotFileNodesFromDag(lastCompactionSstFiles);
+ removeSstFile(sstFileNodesRemoved);
+ deleteOlderSnapshotsCompactionFiles(olderSnapshotsLogFilePaths);
+ }
+
+ /**
+ * Deletes the SST file from the backup directory if exists.
+ */
+ private void removeSstFile(Set<String> sstFileNodes) {
+ for (String sstFileNode: sstFileNodes) {
+ File file = new File(sstBackupDir + sstFileNode + SST_FILE_EXTENSION);
+ try {
+ Files.deleteIfExists(file.toPath());
+ } catch (IOException exception) {
+ LOG.warn("Failed to delete SST file: " + sstFileNode, exception);
+ }
+ }
+ }
+
+ /**
+ * Returns the list of compaction log files which are older than allowed
+ * max time in the compaction DAG.
+ */
+ private List<Path> getOlderSnapshotsCompactionLogFilePaths() {
+ List<Path> olderSnapshotLogPaths = new ArrayList<>();
+
+ long compactionLogPruneStartTime = System.currentTimeMillis();
+
+ List<Path> compactionFiles =
+ listCompactionLogFileFromCompactionLogDirectory();
+
+ for (Path compactionLogPath : compactionFiles) {
+ SnapshotLogInfo snapshotLogInfo =
+ getSnapshotInfoFromLog(compactionLogPath);
+
+ if (snapshotLogInfo == null) {
Review Comment:
Note: `snapshotLogInfo == null` indicates this compaction log file doesn't
have any snapshot entry. Likely due to OM restarts.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]