hemantk-12 commented on code in PR #4045:
URL: https://github.com/apache/ozone/pull/4045#discussion_r1048974909
##########
hadoop-hdds/common/src/main/resources/ozone-default.xml:
##########
@@ -3407,4 +3407,22 @@
</description>
</property>
+ <property>
+ <name>ozone.om.snapshot.compaction.dag.max.time.allowed.ms</name>
+ <value>30d</value>
Review Comment:
It does.
https://github.com/apache/ozone/blob/1c96a5b32c4cbf8dab81694d6cf5730633b5f44e/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/TimeDurationUtil.java#L49
and
https://github.com/apache/ozone/blob/1c96a5b32c4cbf8dab81694d6cf5730633b5f44e/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/TimeDurationUtil.java#L152
##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java:
##########
@@ -534,6 +534,23 @@ public final class OzoneConfigKeys {
public static final String OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE =
"ozone.om.snapshot.cache.max.size";
public static final int OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE_DEFAULT = 10;
+
+ public static final String
+ OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED_MS =
+ "ozone.om.snapshot.compaction.dag.max.time.allowed.ms";
Review Comment:
Make sense. Will remove it.
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -939,6 +978,114 @@ private void populateCompactionDAG(List<String>
inputFiles,
}
+ /**
+ * This is the task definition which is run periodically by the service
+ * executor at fixed delay.
+ * It looks for snapshots in compaction DAG which are older than the allowed
+ * time to be in compaction DAG and removes them from the DAG.
+ */
+ public void pruneOlderSnapshotsWithCompactionHistory() {
+ String snapshotDir = null;
+ long currentTimeMillis = System.currentTimeMillis();
+
+ while (!snapshots.isEmpty() &&
+ (currentTimeMillis - snapshots.peek().getLeft())
+ > maxAllowedTimeInDag) {
+ snapshotDir = snapshots.poll().getRight();
+ }
+
+ if (snapshotDir != null) {
+ pruneSnapshotFileNodesFromDag(snapshotDir);
+ }
+ }
+
+ /**
+ * Prunes forward and backward DAGs when oldest snapshot with compaction
+ * history gets deleted.
+ */
+ public void pruneSnapshotFileNodesFromDag(String snapshotDir) {
+ Set<String> snapshotSstFiles = readRocksDBLiveFiles(snapshotDir);
+ if (snapshotSstFiles.isEmpty()) {
+ LOG.info("Snapshot '{}' doesn't have any sst file to remove.",
+ snapshotDir);
+ return;
+ }
+
+ Set<CompactionNode> startNodes = new HashSet<>();
+ for (String sstFile: snapshotSstFiles) {
+ CompactionNode infileNode = compactionNodeMap.get(sstFile);
+ if (infileNode == null) {
+ LOG.error("Compaction node doesn't exist for sstFile: {}.", sstFile);
+ continue;
+ }
+
+ startNodes.add(infileNode);
+ }
+
+ pruneBackwardDag(backwardCompactionDAG, startNodes);
+ Set<String> sstFilesPruned = pruneForwardDag(forwardCompactionDAG,
+ startNodes);
+
+ // Remove SST file nodes from compactionNodeMap too,
+ // since those nodes won't be needed after clean up.
+ sstFilesPruned.forEach(compactionNodeMap::remove);
+
+ LOG.info("Pruned SST nodes from DAG: {}.", sstFilesPruned);
+ }
+
+ /**
+ * Prunes backward DAG's upstream from the level, that needs to be removed.
+ */
+ @VisibleForTesting
+ Set<String> pruneBackwardDag(MutableGraph<CompactionNode> backwardDag,
+ Set<CompactionNode> startNodes) {
+ Set<String> removedFiles = new HashSet<>();
+ Set<CompactionNode> currentLevel = startNodes;
+
+ while (!currentLevel.isEmpty()) {
+ Set<CompactionNode> nextLevel = new HashSet<>();
+ for (CompactionNode current : currentLevel) {
+ if (!backwardDag.nodes().contains(current)) {
+ continue;
Review Comment:
Yes, it will not be entered. It is mostly to make sure that `startNodes`
passed in parameters are present in the DAG in case `compactionNodeMap`
contains extra node. Otherwise `graph.predecessors(node)` or
`graph.successors(node)` will throw `IllegalArgumentException`.
For example:
Snapshot1 contains: A, B
Snapshot2 contains: A, B, C, D
Snapshot3 contains: A, B, C, D, E, F
Compaction happens after snapshot 3 and next set of files are G, H, I, J, K.
Files G, H, I are from compaction of (A, B, C, D, E, F) and J, K are new files.
Snapshot4 contains: G, H, I, J, K
In first run of the daemon, it deletes snapshot1. And nodes A and B got
deleted.
Second run of the daemon is delete snapshot2. Because A and B were deleted
nodes, in previous run, they won't be present in second run.
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java:
##########
@@ -369,6 +355,55 @@ void diffAllSnapshots(RocksDBCheckpointDiffer differ) {
}
}
+ @ParameterizedTest
+ @ValueSource(ints = {0, 1, 2, 3, 4, 5})
+ public void testRemoveSnapshotFromDag(int snapshotIndex) throws Exception {
+ String clDirStr = "compaction-log";
+ String metadataDirStr = ".";
+ String sstDirStr = "compaction-sst-backup";
+ List<List<String>> expectedDiff = asList(
+ asList("000024", "000017", "000028", "000026", "000019", "000021"),
+ asList("000024", "000028", "000026", "000019", "000021"),
+ asList("000024", "000028", "000026", "000021"),
+ asList("000024", "000028", "000026"),
+ asList("000028", "000026"),
+ Collections.singletonList("000028"),
+ Collections.emptyList()
Review Comment:
No, removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]