This is an automated email from the ASF dual-hosted git repository.
siyao pushed a commit to branch HDDS-6517-Snapshot
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-6517-Snapshot by this
push:
new 9fe6d106ac HDDS-7524. Compaction DAG node pruning (#4045)
9fe6d106ac is described below
commit 9fe6d106acc71fa79402880ba6344fcab131185d
Author: Hemant Kumar <[email protected]>
AuthorDate: Tue Jan 3 01:45:47 2023 -0800
HDDS-7524. Compaction DAG node pruning (#4045)
---
.../org/apache/hadoop/ozone/OzoneConfigKeys.java | 17 +
.../common/src/main/resources/ozone-default.xml | 19 +
.../hadoop/hdds/utils/db/DBStoreBuilder.java | 20 +-
.../org/apache/hadoop/hdds/utils/db/RDBStore.java | 11 +-
hadoop-hdds/rocksdb-checkpoint-differ/pom.xml | 1 +
.../ozone/rocksdiff/RocksDBCheckpointDiffer.java | 493 +++++++++++++++----
.../rocksdiff/TestRocksDBCheckpointDiffer.java | 534 ++++++++++++++++++---
.../apache/hadoop/ozone/om/OmSnapshotManager.java | 5 +-
8 files changed, 957 insertions(+), 143 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 74ed487e8c..8011e35e31 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -534,6 +534,23 @@ public final class OzoneConfigKeys {
public static final String OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE =
"ozone.om.snapshot.cache.max.size";
public static final int OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE_DEFAULT = 10;
+
+ public static final String
+ OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED =
+ "ozone.om.snapshot.compaction.dag.max.time.allowed";
+
+ public static final long
+ OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED_DEFAULT =
+ TimeUnit.DAYS.toMillis(30);
+
+ public static final String
+ OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL =
+ "ozone.om.snapshot.compaction.dag.prune.daemon.run.interval";
+
+ public static final long
+ OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_DAG_DAEMON_RUN_INTERVAL_DEFAULT =
+ TimeUnit.HOURS.toMillis(1);
+
/**
* There is no need to instantiate this class.
*/
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 3e8499bf71..64977ecd32 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3407,4 +3407,23 @@
</description>
</property>
+ <property>
+ <name>ozone.om.snapshot.compaction.dag.max.time.allowed</name>
+ <value>30d</value>
+ <tag>OZONE, OM</tag>
+ <description>
+ Maximum time a snapshot is allowed to be in compaction DAG before it
gets pruned out by pruning daemon.
+ Uses millisecond by default when no time unit is specified.
+ </description>
+ </property>
+
+ <property>
+ <name>ozone.om.snapshot.compaction.dag.prune.daemon.run.interval</name>
+ <value>3600s</value>
+ <tag>OZONE, OM</tag>
+ <description>
+ Interval at which compaction DAG pruning daemon thread is running to
remove older snapshots with compaction
+ history from compaction DAG. Uses millisecond by default when no time
unit is specified.
+ </description>
+ </property>
</configuration>
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
index 5aad1aa526..c3b0151cb9 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -44,6 +45,10 @@ import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKS
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS_DEFAULT;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS_OFF;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED_DEFAULT;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_DAG_DAEMON_RUN_INTERVAL_DEFAULT;
import static org.rocksdb.RocksDB.DEFAULT_COLUMN_FAMILY;
import org.apache.hadoop.hdds.conf.StorageUnit;
@@ -98,6 +103,8 @@ public final class DBStoreBuilder {
private int maxFSSnapshots = 0;
private final DBProfile defaultCfProfile;
private boolean enableCompactionLog;
+ private long maxTimeAllowedForSnapshotInDag;
+ private long pruneCompactionDagDaemonRunInterval;
/**
* Create DBStoreBuilder from a generic DBDefinition.
@@ -145,6 +152,16 @@ public final class DBStoreBuilder {
defaultCfProfile = this.configuration.getEnum(HDDS_DB_PROFILE,
HDDS_DEFAULT_DB_PROFILE);
LOG.debug("Default DB profile:{}", defaultCfProfile);
+
+ maxTimeAllowedForSnapshotInDag = configuration.getTimeDuration(
+ OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED,
+ OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ pruneCompactionDagDaemonRunInterval = configuration.getTimeDuration(
+ OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL,
+ OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_DAG_DAEMON_RUN_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
}
private void applyDBDefinition(DBDefinition definition) {
@@ -201,7 +218,8 @@ public final class DBStoreBuilder {
return new RDBStore(dbFile, rocksDBOption, writeOptions, tableConfigs,
registry, openReadOnly, maxFSSnapshots, dbJmxBeanNameName,
- enableCompactionLog);
+ enableCompactionLog, maxTimeAllowedForSnapshotInDag,
+ pruneCompactionDagDaemonRunInterval);
} finally {
tableConfigs.forEach(TableConfig::close);
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index adafe079a8..2535ff6619 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.utils.RocksDBStoreMBean;
import org.apache.hadoop.hdds.utils.db.cache.TableCache;
@@ -80,14 +81,17 @@ public class RDBStore implements DBStore {
public RDBStore(File dbFile, ManagedDBOptions options,
Set<TableConfig> families) throws IOException {
this(dbFile, options, new ManagedWriteOptions(), families,
- new CodecRegistry(), false, 1000, null, false);
+ new CodecRegistry(), false, 1000, null, false,
+ TimeUnit.DAYS.toMillis(1), TimeUnit.HOURS.toMillis(1));
}
@SuppressWarnings("parameternumber")
public RDBStore(File dbFile, ManagedDBOptions dbOptions,
ManagedWriteOptions writeOptions, Set<TableConfig> families,
CodecRegistry registry, boolean readOnly, int maxFSSnapshots,
- String dbJmxBeanNameName, boolean enableCompactionLog)
+ String dbJmxBeanNameName, boolean enableCompactionLog,
+ long maxTimeAllowedForSnapshotInDag,
+ long compactionDagDaemonInterval)
throws IOException {
Preconditions.checkNotNull(dbFile, "DB file location cannot be null");
Preconditions.checkNotNull(families);
@@ -101,7 +105,8 @@ public class RDBStore implements DBStore {
if (enableCompactionLog) {
rocksDBCheckpointDiffer = new RocksDBCheckpointDiffer(
dbLocation.getParent(), dbCompactionSSTBackupDirName,
- dbCompactionLogDirName, dbLocation);
+ dbCompactionLogDirName, dbLocation, maxTimeAllowedForSnapshotInDag,
+ compactionDagDaemonInterval);
rocksDBCheckpointDiffer.setRocksDBForCompactionTracking(dbOptions);
} else {
rocksDBCheckpointDiffer = null;
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
index b288364f73..3c28aac8cb 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
@@ -88,6 +88,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
+ <scope>test</scope>
</dependency>
</dependencies>
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index 7a8ade9f5f..9b015b24e1 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -21,6 +21,11 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.graph.GraphBuilder;
import com.google.common.graph.MutableGraph;
+import java.util.Collections;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.rocksdb.AbstractEventListener;
import org.rocksdb.ColumnFamilyDescriptor;
@@ -53,7 +58,6 @@ import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -83,7 +87,7 @@ import static java.util.Arrays.asList;
* It is important to note that compaction log is per-DB instance. Since
* each OM DB instance might trigger compactions at different timings.
*/
-public class RocksDBCheckpointDiffer {
+public class RocksDBCheckpointDiffer implements AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(RocksDBCheckpointDiffer.class);
@@ -91,7 +95,7 @@ public class RocksDBCheckpointDiffer {
private final String sstBackupDir;
private final String activeDBLocationStr;
- private String compactionLogDir = null;
+ private final String compactionLogDir;
/**
* Compaction log path for DB compaction history persistence.
@@ -103,7 +107,7 @@ public class RocksDBCheckpointDiffer {
*/
private volatile String currentCompactionLogPath = null;
- private static final String COMPACTION_LOG_FILENAME_SUFFIX = ".log";
+ static final String COMPACTION_LOG_FILE_NAME_SUFFIX = ".log";
/**
* Marks the beginning of a comment line in the compaction log.
@@ -119,7 +123,22 @@ public class RocksDBCheckpointDiffer {
* Prefix for the sequence number line when writing to compaction log
* right after taking an Ozone snapshot.
*/
- private static final String COMPACTION_LOG_SEQNUM_LINE_PREFIX = "S ";
+ private static final String COMPACTION_LOG_SEQ_NUM_LINE_PREFIX = "S ";
+
+ /**
+ * Delimiter use to join compaction's input and output files.
+ * e.g. input1,input2,input3 or output1,output2,output3
+ */
+ private static final String COMPACTION_LOG_ENTRY_FILE_DELIMITER = ",";
+
+ private static final String SPACE_DELIMITER = " ";
+
+ /**
+ * Delimiter use to join compaction's input and output file set strings.
+ * e.g. input1,input2,input3:output1,output2,output3
+ */
+ private static final String COMPACTION_LOG_ENTRY_INPUT_OUTPUT_FILES_DELIMITER
+ = ":";
/**
* SST file extension. Must be lower case.
@@ -130,7 +149,7 @@ public class RocksDBCheckpointDiffer {
private static final int SST_FILE_EXTENSION_LENGTH =
SST_FILE_EXTENSION.length();
- private static final int LONG_MAX_STRLEN =
+ private static final int LONG_MAX_STR_LEN =
String.valueOf(Long.MAX_VALUE).length();
/**
@@ -143,83 +162,107 @@ public class RocksDBCheckpointDiffer {
* Dummy object that acts as a write lock in compaction listener.
*/
private final Object compactionListenerWriteLock = new Object();
- /**
- * Flag for testing. Skips SST file summary reader.
- */
- private boolean skipGetSSTFileSummary = false;
+
+ private final ScheduledExecutorService executor;
+ private final long maxAllowedTimeInDag;
/**
* Constructor.
* Note that previous compaction logs are loaded by RDBStore after this
* object's initialization by calling loadAllCompactionLogs().
+ *
* @param metadataDir Ozone metadata directory.
* @param sstBackupDir Name of the SST backup dir under metadata dir.
* @param compactionLogDirName Name of the compaction log dir.
+ * @param activeDBLocation Active RocksDB directory's location.
+ * @param maxTimeAllowedForSnapshotInDagInMs Time after which snapshot will
be
+ * pruned from the DAG by daemon.
+ * @param pruneCompactionDagDaemonRunIntervalInMs Internal at which DAG
+ * pruning daemon will run.
*/
- public RocksDBCheckpointDiffer(String metadataDir, String sstBackupDir,
- String compactionLogDirName, File activeDBLocation) {
-
- setCompactionLogDir(metadataDir, compactionLogDirName);
-
+ public RocksDBCheckpointDiffer(String metadataDir,
+ String sstBackupDir,
+ String compactionLogDirName,
+ File activeDBLocation,
+ long maxTimeAllowedForSnapshotInDagInMs,
+ long pruneCompactionDagDaemonRunIntervalInMs)
{
+ this.compactionLogDir =
+ createCompactionLogDir(metadataDir, compactionLogDirName);
this.sstBackupDir = Paths.get(metadataDir, sstBackupDir) + "/";
-
- // Create the directory if SST backup path does not already exist
- File dir = new File(this.sstBackupDir);
- if (!dir.exists() && !dir.mkdir()) {
- final String errorMsg = "Failed to create SST file backup directory. "
- + "Check if OM has write permission.";
- LOG.error(errorMsg);
- throw new RuntimeException(errorMsg);
- }
+ createSstBackUpDir();
// Active DB location is used in getSSTFileSummary
this.activeDBLocationStr = activeDBLocation.toString() + "/";
+ this.maxAllowedTimeInDag = maxTimeAllowedForSnapshotInDagInMs;
+ this.executor = Executors.newSingleThreadScheduledExecutor();
+
+ if (pruneCompactionDagDaemonRunIntervalInMs > 0) {
+ this.executor.scheduleWithFixedDelay(
+ this::pruneOlderSnapshotsWithCompactionHistory,
+ pruneCompactionDagDaemonRunIntervalInMs,
+ pruneCompactionDagDaemonRunIntervalInMs,
+ TimeUnit.MILLISECONDS);
+ }
}
- /**
- * This constructor is only meant for unit testing.
- */
- @VisibleForTesting
- RocksDBCheckpointDiffer() {
- this.skipGetSSTFileSummary = true;
- this.sstBackupDir = null;
- this.activeDBLocationStr = null;
+ public RocksDBCheckpointDiffer(String sstBackupDir,
+ String compactionLogDirName,
+ String activeDBLocationName,
+ long maxTimeAllowedForSnapshotInDagInMs) {
+ this.compactionLogDir = compactionLogDirName;
+ this.sstBackupDir = sstBackupDir;
+ this.activeDBLocationStr = activeDBLocationName;
+ this.maxAllowedTimeInDag = maxTimeAllowedForSnapshotInDagInMs;
+ this.executor = null;
}
- private void setCompactionLogDir(String metadataDir,
- String compactionLogDirName) {
+ private String createCompactionLogDir(String metadataDir,
+ String compactionLogDirName) {
final File parentDir = new File(metadataDir);
if (!parentDir.exists()) {
if (!parentDir.mkdir()) {
LOG.error("Error creating compaction log parent dir.");
- return;
+ return null;
}
}
- this.compactionLogDir =
+ final String compactionLogDirectory =
Paths.get(metadataDir, compactionLogDirName).toString();
- File clDir = new File(compactionLogDir);
+ File clDir = new File(compactionLogDirectory);
if (!clDir.exists() && !clDir.mkdir()) {
LOG.error("Error creating compaction log dir.");
- return;
+ return null;
}
// Create a readme file explaining what the compaction log dir is for
- final Path readmePath = Paths.get(compactionLogDir, "_README.txt");
+ final Path readmePath = Paths.get(compactionLogDirectory, "_README.txt");
final File readmeFile = new File(readmePath.toString());
if (!readmeFile.exists()) {
try (BufferedWriter bw = Files.newBufferedWriter(
readmePath, StandardOpenOption.CREATE)) {
- bw.write("This directory holds Ozone Manager RocksDB compaction
logs.\n"
- + "DO NOT add, change or delete any files in this directory unless
"
- + "you know what you are doing.\n");
+ bw.write("This directory holds Ozone Manager RocksDB compaction" +
+ " logs.\nDO NOT add, change or delete any files in this directory"
+
+ " unless you know what you are doing.\n");
} catch (IOException ignored) {
}
}
- // Append /
- this.compactionLogDir += "/";
+ // Append '/' to make it dir.
+ return compactionLogDirectory + "/";
+ }
+
+ /**
+ * Create the directory if SST backup path does not already exist.
+ */
+ private void createSstBackUpDir() {
+ File dir = new File(this.sstBackupDir);
+ if (!dir.exists() && !dir.mkdir()) {
+ String errorMsg = "Failed to create SST file backup directory. "
+ + "Check if OM has write permission.";
+ LOG.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
}
/**
@@ -229,16 +272,16 @@ public class RocksDBCheckpointDiffer {
public void setCurrentCompactionLog(long latestSequenceNum) {
String latestSequenceIdStr = String.valueOf(latestSequenceNum);
- if (latestSequenceIdStr.length() < LONG_MAX_STRLEN) {
+ if (latestSequenceIdStr.length() < LONG_MAX_STR_LEN) {
// Pad zeroes to the left for ordered file listing when sorted
// alphabetically.
latestSequenceIdStr =
- StringUtils.leftPad(latestSequenceIdStr, LONG_MAX_STRLEN, "0");
+ StringUtils.leftPad(latestSequenceIdStr, LONG_MAX_STR_LEN, "0");
}
// Local temp variable for storing the new compaction log file path
- final String newCompactionLog =
- compactionLogDir + latestSequenceIdStr +
COMPACTION_LOG_FILENAME_SUFFIX;
+ final String newCompactionLog = compactionLogDir + latestSequenceIdStr +
+ COMPACTION_LOG_FILE_NAME_SUFFIX;
File clFile = new File(newCompactionLog);
if (clFile.exists()) {
@@ -251,6 +294,13 @@ public class RocksDBCheckpointDiffer {
appendToCurrentCompactionLog("");
}
+ @Override
+ public void close() throws Exception {
+ if (executor != null) {
+ executor.shutdown();
+ }
+ }
+
// Hash table to track CompactionNode for a given SST File.
private final ConcurrentHashMap<String, CompactionNode> compactionNodeMap =
new ConcurrentHashMap<>();
@@ -309,12 +359,12 @@ public class RocksDBCheckpointDiffer {
/**
* Append a sequence number to the compaction log (roughly) when an Ozone
* snapshot (RDB checkpoint) is taken.
- * @param sequenceNum RDB sequence number
*/
- public void appendSequenceNumberToCompactionLog(long sequenceNum,
- String snapshotID) {
- final String line = COMPACTION_LOG_SEQNUM_LINE_PREFIX + sequenceNum +
- " " + snapshotID + "\n";
+ public void appendSnapshotInfoToCompactionLog(long sequenceNum,
+ String snapshotID,
+ long creationTime) {
+ final String line = COMPACTION_LOG_SEQ_NUM_LINE_PREFIX + sequenceNum +
+ SPACE_DELIMITER + snapshotID + SPACE_DELIMITER + creationTime + "\n";
appendToCurrentCompactionLog(line);
}
@@ -430,17 +480,19 @@ public class RocksDBCheckpointDiffer {
// Trim the file path, leave only the SST file name without extension
inputFiles.replaceAll(s -> s.substring(
filenameOffset, s.length() - SST_FILE_EXTENSION_LENGTH));
- final String inputFilesJoined = String.join(",", inputFiles);
+ final String inputFilesJoined =
+ String.join(COMPACTION_LOG_ENTRY_FILE_DELIMITER, inputFiles);
sb.append(inputFilesJoined);
- // Insert delimiter between input files an output files
- sb.append(':');
+ // Insert delimiter between input files and output files
+ sb.append(COMPACTION_LOG_ENTRY_INPUT_OUTPUT_FILES_DELIMITER);
// Append the list of output files
final List<String> outputFiles = compactionJobInfo.outputFiles();
outputFiles.replaceAll(s -> s.substring(
filenameOffset, s.length() - SST_FILE_EXTENSION_LENGTH));
- final String outputFilesJoined = String.join(",", outputFiles);
+ final String outputFilesJoined =
+ String.join(COMPACTION_LOG_ENTRY_FILE_DELIMITER, outputFiles);
sb.append(outputFilesJoined);
// End of line
@@ -454,11 +506,6 @@ public class RocksDBCheckpointDiffer {
// snapshotChainManager.getLatestGlobalSnapshot()
populateCompactionDAG(inputFiles, outputFiles, null,
db.getLatestSequenceNumber());
-/*
- if (debugEnabled(DEBUG_DAG_BUILD_UP)) {
- printMutableGraph(null, null, compactionDAGFwd);
- }
- */
}
}
};
@@ -471,7 +518,7 @@ public class RocksDBCheckpointDiffer {
*/
private long getSSTFileSummary(String filename) throws RocksDBException {
- if (skipGetSSTFileSummary) {
+ if (activeDBLocationStr == null) {
// For testing only
return 1L;
}
@@ -596,23 +643,18 @@ public class RocksDBCheckpointDiffer {
if (line.startsWith("#")) {
// Skip comments
LOG.debug("Comment line, skipped");
- } else if (line.startsWith(COMPACTION_LOG_SEQNUM_LINE_PREFIX)) {
- // Read sequence number, and snapshot ID
- LOG.debug("Reading sequence number as snapshot generation, "
- + "and snapshot ID");
- final String trimmedStr =
- line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length()).trim();
- final Scanner input = new Scanner(trimmedStr);
- // This would the snapshot generation for the nodes to come
- reconstructionSnapshotGeneration = input.nextLong();
- // This is the snapshotID assigned to every single CompactionNode to come
- reconstructionLastSnapshotID = input.nextLine().trim();
+ } else if (line.startsWith(COMPACTION_LOG_SEQ_NUM_LINE_PREFIX)) {
+ SnapshotLogInfo snapshotLogInfo = getSnapshotLogInfo(line);
+ reconstructionSnapshotGeneration = snapshotLogInfo.snapshotGenerationId;
+ reconstructionLastSnapshotID = snapshotLogInfo.snapshotId;
} else if (line.startsWith(COMPACTION_LOG_ENTRY_LINE_PREFIX)) {
// Read compaction log entry
// Trim the beginning
line = line.substring(COMPACTION_LOG_ENTRY_LINE_PREFIX.length());
- final String[] io = line.split(":");
+ String[] io =
+ line.split(COMPACTION_LOG_ENTRY_INPUT_OUTPUT_FILES_DELIMITER);
+
if (io.length != 2) {
if (line.endsWith(":")) {
LOG.debug("Ignoring compaction log line for SST deletion");
@@ -621,8 +663,9 @@ public class RocksDBCheckpointDiffer {
}
return;
}
- final String[] inputFiles = io[0].split(",");
- final String[] outputFiles = io[1].split(",");
+
+ String[] inputFiles = io[0].split(COMPACTION_LOG_ENTRY_FILE_DELIMITER);
+ String[] outputFiles = io[1].split(COMPACTION_LOG_ENTRY_FILE_DELIMITER);
populateCompactionDAG(asList(inputFiles), asList(outputFiles),
reconstructionLastSnapshotID, reconstructionSnapshotGeneration);
} else {
@@ -655,7 +698,7 @@ public class RocksDBCheckpointDiffer {
try {
try (Stream<Path> pathStream = Files.list(Paths.get(compactionLogDir))
.filter(e -> e.toString().toLowerCase()
- .endsWith(COMPACTION_LOG_FILENAME_SUFFIX))
+ .endsWith(COMPACTION_LOG_FILE_NAME_SUFFIX))
.sorted()) {
for (Path logPath : pathStream.collect(Collectors.toList())) {
readCompactionLogToDAG(logPath.toString());
@@ -699,14 +742,14 @@ public class RocksDBCheckpointDiffer {
logSB.append("Fwd DAG same SST files: ");
for (String file : fwdDAGSameFiles) {
- logSB.append(file).append(" ");
+ logSB.append(file).append(SPACE_DELIMITER);
}
LOG.debug(logSB.toString());
logSB.setLength(0);
logSB.append("Fwd DAG different SST files: ");
for (String file : fwdDAGDifferentFiles) {
- logSB.append(file).append(" ");
+ logSB.append(file).append(SPACE_DELIMITER);
}
LOG.debug("{}", logSB);
}
@@ -939,6 +982,295 @@ public class RocksDBCheckpointDiffer {
}
+ /**
+ * This is the task definition which is run periodically by the service
+ * executor at fixed delay.
+ * It looks for snapshots in compaction DAG which are older than the allowed
+ * time to be in compaction DAG and removes them from the DAG.
+ */
+ public void pruneOlderSnapshotsWithCompactionHistory() {
+ List<Path> olderSnapshotsLogFilePaths =
+ getOlderSnapshotsCompactionLogFilePaths();
+ List<String> lastCompactionSstFiles =
+ getLastCompactionSstFiles(olderSnapshotsLogFilePaths);
+
+ Set<String> sstFileNodesRemoved =
+ pruneSnapshotFileNodesFromDag(lastCompactionSstFiles);
+ removeSstFile(sstFileNodesRemoved);
+ deleteOlderSnapshotsCompactionFiles(olderSnapshotsLogFilePaths);
+ }
+
+ /**
+ * Deletes the SST file from the backup directory if exists.
+ */
+ private void removeSstFile(Set<String> sstFileNodes) {
+ for (String sstFileNode: sstFileNodes) {
+ File file = new File(sstBackupDir + sstFileNode + SST_FILE_EXTENSION);
+ try {
+ Files.deleteIfExists(file.toPath());
+ } catch (IOException exception) {
+ LOG.warn("Failed to delete SST file: " + sstFileNode, exception);
+ }
+ }
+ }
+
+ /**
+ * Returns the list of compaction log files which are older than allowed
+ * max time in the compaction DAG.
+ */
+ private List<Path> getOlderSnapshotsCompactionLogFilePaths() {
+ long compactionLogPruneStartTime = System.currentTimeMillis();
+
+ List<Path> compactionFiles =
+ listCompactionLogFileFromCompactionLogDirectory();
+
+ int index = compactionFiles.size() - 1;
+ for (; index >= 0; index--) {
+ Path compactionLogPath = compactionFiles.get(index);
+ SnapshotLogInfo snapshotLogInfo =
+ getSnapshotInfoFromLog(compactionLogPath);
+
+ if (snapshotLogInfo == null) {
+ continue;
+ }
+
+ if (compactionLogPruneStartTime - snapshotLogInfo.snapshotCreatedAt >
+ maxAllowedTimeInDag) {
+ break;
+ }
+ }
+
+ if (index >= 0) {
+ return compactionFiles.subList(0, index + 1);
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ /**
+ * Returns the list of compaction log file path from compaction log
directory.
+ */
+ private List<Path> listCompactionLogFileFromCompactionLogDirectory() {
+ try (Stream<Path> pathStream = Files.list(Paths.get(compactionLogDir))
+ .filter(e -> e.toString().toLowerCase()
+ .endsWith(COMPACTION_LOG_FILE_NAME_SUFFIX))
+ .sorted()) {
+ return pathStream.collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new RuntimeException("Error listing compaction log dir " +
+ compactionLogDir, e);
+ }
+ }
+
+ public void deleteOlderSnapshotsCompactionFiles(
+ List<Path> olderSnapshotsLogFilePaths) {
+
+ for (int i = 0; i < olderSnapshotsLogFilePaths.size(); i++) {
+ Path olderSnapshotsLogFilePath = olderSnapshotsLogFilePaths.get(i);
+ try {
+ Files.deleteIfExists(olderSnapshotsLogFilePath);
+ } catch (IOException exception) {
+ LOG.error("Failed to deleted SST file: {}", olderSnapshotsLogFilePath,
+ exception);
+ }
+ }
+ }
+
+ /**
+ * Prunes forward and backward DAGs when oldest snapshot with compaction
+ * history gets deleted.
+ */
+ public Set<String > pruneSnapshotFileNodesFromDag(List<String> sstFileNodes)
{
+ Set<CompactionNode> startNodes = new HashSet<>();
+ for (String sstFileNode : sstFileNodes) {
+ CompactionNode infileNode = compactionNodeMap.get(sstFileNode);
+ if (infileNode == null) {
+ LOG.warn("Compaction node doesn't exist for sstFile: {}.",
sstFileNode);
+ continue;
+ }
+
+ startNodes.add(infileNode);
+ }
+
+ pruneBackwardDag(backwardCompactionDAG, startNodes);
+ Set<String> sstFilesPruned = pruneForwardDag(forwardCompactionDAG,
+ startNodes);
+
+ // Remove SST file nodes from compactionNodeMap too,
+ // since those nodes won't be needed after clean up.
+ sstFilesPruned.forEach(compactionNodeMap::remove);
+
+ return sstFilesPruned;
+ }
+
+ /**
+ * Prunes backward DAG's upstream from the level, that needs to be removed.
+ */
+ @VisibleForTesting
+ Set<String> pruneBackwardDag(MutableGraph<CompactionNode> backwardDag,
+ Set<CompactionNode> startNodes) {
+ Set<String> removedFiles = new HashSet<>();
+ Set<CompactionNode> currentLevel = startNodes;
+
+ while (!currentLevel.isEmpty()) {
+ Set<CompactionNode> nextLevel = new HashSet<>();
+ for (CompactionNode current : currentLevel) {
+ if (!backwardDag.nodes().contains(current)) {
+ continue;
+ }
+
+ nextLevel.addAll(backwardDag.predecessors(current));
+ backwardDag.removeNode(current);
+ removedFiles.add(current.getFileName());
+ }
+ currentLevel = nextLevel;
+ }
+
+ return removedFiles;
+ }
+
+ /**
+ * Prunes forward DAG's downstream from the level that needs to be removed.
+ */
+ @VisibleForTesting
+ Set<String> pruneForwardDag(MutableGraph<CompactionNode> forwardDag,
+ Set<CompactionNode> startNodes) {
+ Set<String> removedFiles = new HashSet<>();
+ Set<CompactionNode> currentLevel = new HashSet<>(startNodes);
+
+ while (!currentLevel.isEmpty()) {
+ Set<CompactionNode> nextLevel = new HashSet<>();
+ for (CompactionNode current : currentLevel) {
+ if (!forwardDag.nodes().contains(current)) {
+ continue;
+ }
+
+ nextLevel.addAll(forwardDag.successors(current));
+ forwardDag.removeNode(current);
+ removedFiles.add(current.getFileName());
+ }
+
+ currentLevel = nextLevel;
+ }
+
+ return removedFiles;
+ }
+
+ private SnapshotLogInfo getSnapshotInfoFromLog(Path compactionLogFile) {
+ AtomicReference<SnapshotLogInfo> snapshotLogInfo =
+ new AtomicReference<>();
+ try (Stream<String> logStream = Files.lines(compactionLogFile, UTF_8)) {
+ logStream.forEach(logLine -> {
+ if (!logLine.startsWith(COMPACTION_LOG_SEQ_NUM_LINE_PREFIX)) {
+ return;
+ }
+
+ snapshotLogInfo.set(getSnapshotLogInfo(logLine));
+ });
+ } catch (IOException exception) {
+ throw new RuntimeException("Failed to read compaction log file: " +
+ compactionLogFile, exception);
+ }
+
+ return snapshotLogInfo.get();
+ }
+
+ /**
+ * Converts a snapshot compaction log line to SnapshotLogInfo.
+ */
+ private SnapshotLogInfo getSnapshotLogInfo(String logLine) {
+ // Remove `S ` from the line.
+ String line =
+ logLine.substring(COMPACTION_LOG_SEQ_NUM_LINE_PREFIX.length());
+
+ String[] splits = line.split(SPACE_DELIMITER);
+ Preconditions.checkArgument(splits.length == 3,
+ "Snapshot info log statement has more than expected parameters.");
+
+ return new SnapshotLogInfo(Long.parseLong(splits[0]),
+ splits[1],
+ Long.parseLong(splits[2]));
+ }
+
+ /**
+ * Returns the list of SST files got compacted in the last compaction from
+ * the provided list of compaction log files.
+ * We can't simply use last file from the list because it is possible that
+ * no compaction happened between the last snapshot and previous to that.
+ * Hence, we go over the list in reverse order and return the SST files from
+ * first the compaction happened in the reverse list.
+ * If no compaction happen at all, it returns empty list.
+ */
+ private List<String> getLastCompactionSstFiles(
+ List<Path> compactionLogFiles
+ ) {
+
+ if (compactionLogFiles.isEmpty()) {
+ return Collections.emptyList();
+ }
+ compactionLogFiles = new ArrayList<>(compactionLogFiles);
+ Collections.reverse(compactionLogFiles);
+
+ for (Path compactionLogFile: compactionLogFiles) {
+ List<String> sstFiles = getLastCompactionSstFiles(compactionLogFile);
+ if (!sstFiles.isEmpty()) {
+ return sstFiles;
+ }
+ }
+
+ return Collections.emptyList();
+ }
+
+ private List<String> getLastCompactionSstFiles(Path compactionLogFile) {
+
+ AtomicReference<String> sstFiles = new AtomicReference<>();
+
+ try (Stream<String> logStream = Files.lines(compactionLogFile, UTF_8)) {
+ logStream.forEach(logLine -> {
+ if (!logLine.startsWith(COMPACTION_LOG_ENTRY_LINE_PREFIX)) {
+ return;
+ }
+ sstFiles.set(logLine);
+ });
+ } catch (IOException exception) {
+ throw new RuntimeException("Failed to read file: " + compactionLogFile,
+ exception);
+ }
+
+ String lastCompactionLogEntry = sstFiles.get();
+
+ if (StringUtils.isEmpty(lastCompactionLogEntry)) {
+ return Collections.emptyList();
+ }
+
+ // Trim the beginning
+ lastCompactionLogEntry = lastCompactionLogEntry
+ .substring(COMPACTION_LOG_ENTRY_LINE_PREFIX.length());
+
+ String[] io = lastCompactionLogEntry
+ .split(COMPACTION_LOG_ENTRY_INPUT_OUTPUT_FILES_DELIMITER);
+
+ assert (io.length == 2);
+
+ String[] outputFiles = io[1].split(COMPACTION_LOG_ENTRY_FILE_DELIMITER);
+
+ return asList(outputFiles);
+ }
+
+ private static final class SnapshotLogInfo {
+ private final long snapshotGenerationId;
+ private final String snapshotId;
+ private final long snapshotCreatedAt;
+
+ private SnapshotLogInfo(long snapshotGenerationId,
+ String snapshotId,
+ long snapshotCreatedAt) {
+ this.snapshotGenerationId = snapshotGenerationId;
+ this.snapshotId = snapshotId;
+ this.snapshotCreatedAt = snapshotCreatedAt;
+ }
+ }
+
@VisibleForTesting
public boolean debugEnabled(Integer level) {
return DEBUG_LEVEL.contains(level);
@@ -953,5 +1285,4 @@ public class RocksDBCheckpointDiffer {
public ConcurrentHashMap<String, CompactionNode> getCompactionNodeMap() {
return compactionNodeMap;
}
-
}
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
index 590e740575..af5aec638a 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
@@ -19,10 +19,8 @@ package org.apache.ozone.rocksdiff;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Arrays.asList;
-import static
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_DAG_LIVE_NODES;
-import static
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_READ_ALL_DB_KEYS;
-import static org.junit.jupiter.api.Assertions.fail;
-
+import static java.util.concurrent.TimeUnit.MINUTES;
+import com.google.common.graph.GraphBuilder;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
@@ -35,17 +33,19 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
-import java.util.Random;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.graph.MutableGraph;
+import org.apache.commons.lang3.RandomStringUtils;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.NodeComparator;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -65,6 +65,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
+import static
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.COMPACTION_LOG_FILE_NAME_SUFFIX;
+import static
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_DAG_LIVE_NODES;
+import static
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_READ_ALL_DB_KEYS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
/**
* Test RocksDBCheckpointDiffer basic functionality.
*/
@@ -84,19 +92,10 @@ public class TestRocksDBCheckpointDiffer {
* RocksDB checkpoint path prefix.
*/
private static final String CP_PATH_PREFIX = "rocksdb-cp-";
- private final ArrayList<DifferSnapshotInfo> snapshots = new ArrayList<>();
-
- /**
- * Graph type.
- */
- enum GType {
- FNAME,
- KEYSIZE,
- CUMUTATIVE_SIZE
- }
+ private final List<DifferSnapshotInfo> snapshots = new ArrayList<>();
- @BeforeAll
- public static void init() {
+ @BeforeEach
+ public void init() {
// Checkpoint differ log level. Set to DEBUG for verbose output
GenericTestUtils.setLogLevel(RocksDBCheckpointDiffer.getLog(), Level.INFO);
// Test class log level. Set to DEBUG for verbose output
@@ -203,25 +202,33 @@ public class TestRocksDBCheckpointDiffer {
Set<String> expectedDiffSstFiles,
boolean expectingException) {
- RocksDBCheckpointDiffer differ = new RocksDBCheckpointDiffer();
+ RocksDBCheckpointDiffer differ =
+ new RocksDBCheckpointDiffer(null, null, null, 0L);
boolean exceptionThrown = false;
+ long createdTime = System.currentTimeMillis();
String compactionLog = ""
- + "S 1000 df6410c7-151b-4e90-870e-5ef12875acd5\n" // Snapshot 0
+ // Snapshot 0
+ + "S 1000 df6410c7-151b-4e90-870e-5ef12875acd5 " + createdTime + " \n"
+ // Additional "compaction" to trigger and test early exit condition
+ "C 000001,000002:000062\n"
- // Additional "compaction" to trigger and test early exit condition
- + "S 3008 ef6410c7-151b-4e90-870e-5ef12875acd5\n" // Snapshot 1
- + "C 000068,000062:000069\n" // Regular compaction
+ // Snapshot 1
+ + "S 3008 ef6410c7-151b-4e90-870e-5ef12875acd5 " + createdTime + " \n"
+ // Regular compaction
+ + "C 000068,000062:000069\n"
+ // Trivial move
+ "C 000071,000064,000060,000052:000071,000064,000060,000052\n"
- // Trivial move
+ "C 000073,000066:000074\n"
+ "C 000082,000076,000069:000083\n"
+ "C 000087,000080,000074:000088\n"
- + "C 000093,000090,000083:\n" // Deletion?
- + "S 14980 e7ad72f8-52df-4430-93f6-0ee91d4a47fd\n" // Snapshot 2
+ // Deletion?
+ + "C 000093,000090,000083:\n"
+ // Snapshot 2
+ + "S 14980 e7ad72f8-52df-4430-93f6-0ee91d4a47fd " + createdTime + "\n"
+ "C 000098,000096,000085,000078,000071,000064,000060,000052:000099\n"
+ "C 000105,000095,000088:000107\n"
- + "S 17975 4f084f6e-ed3d-4780-8362-f832303309ea\n"; // Snapshot 3
+ // Snapshot 3
+ + "S 17975 4f084f6e-ed3d-4780-8362-f832303309ea " + createdTime + "\n";
// Construct DAG from compaction log input
Arrays.stream(compactionLog.split("\n")).forEach(
@@ -277,7 +284,9 @@ public class TestRocksDBCheckpointDiffer {
final File dbLocation = new File(TEST_DB_PATH);
RocksDBCheckpointDiffer differ = new RocksDBCheckpointDiffer(
- metadataDirStr, sstDirStr, clDirStr, dbLocation);
+ metadataDirStr, sstDirStr, clDirStr, dbLocation,
+ TimeUnit.DAYS.toMillis(1),
+ MINUTES.toMillis(5));
// Empty the SST backup folder first for testing
File sstDir = new File(sstDirStr);
@@ -312,32 +321,9 @@ public class TestRocksDBCheckpointDiffer {
differ.dumpCompactionNodeTable();
}
- for (GType gtype : GType.values()) {
- String fname = "fwdGraph_" + gtype + ".png";
- String rname = "reverseGraph_" + gtype + ".png";
-/*
- differ.pngPrintMutableGrapth(differ.getCompactionFwdDAG(), fname, gtype);
- differ.pngPrintMutableGrapth(
- differ.getCompactionReverseDAG(), rname, gtype);
- */
- }
-
rocksDB.close();
}
- private String getRandomString(Random random, int length) {
- // Ref: https://www.baeldung.com/java-random-string
- final int leftLimit = 48; // numeral '0'
- final int rightLimit = 122; // letter 'z'
-
- return random.ints(leftLimit, rightLimit + 1)
- .filter(i -> (i <= 57 || i >= 65) && (i <= 90 || i >= 97))
- .limit(length)
- .collect(StringBuilder::new,
- StringBuilder::appendCodePoint, StringBuilder::append)
- .toString();
- }
-
/**
* Test SST differ.
*/
@@ -396,8 +382,9 @@ public class TestRocksDBCheckpointDiffer {
this.snapshots.add(currentSnapshot);
// Same as what OmSnapshotManager#createOmSnapshotCheckpoint would do
- differ.appendSequenceNumberToCompactionLog(
- dbLatestSequenceNumber, snapshotId);
+ differ.appendSnapshotInfoToCompactionLog(dbLatestSequenceNumber,
+ snapshotId,
+ System.currentTimeMillis());
differ.setCurrentCompactionLog(dbLatestSequenceNumber);
@@ -453,10 +440,9 @@ public class TestRocksDBCheckpointDiffer {
differ.setCurrentCompactionLog(rocksDB.getLatestSequenceNumber());
- Random random = new Random();
// key-value
for (int i = 0; i < NUM_ROW; ++i) {
- String generatedString = getRandomString(random, 7);
+ String generatedString = RandomStringUtils.randomAlphabetic(7);
String keyStr = "Key-" + i + "-" + generatedString;
String valueStr = "Val-" + i + "-" + generatedString;
byte[] key = keyStr.getBytes(UTF_8);
@@ -678,4 +664,440 @@ public class TestRocksDBCheckpointDiffer {
LOG.debug("Files are: {}", allNodesSet);
}
-}
\ No newline at end of file
+
+ private static final List<List<String>> SST_FILES_BY_LEVEL = Arrays.asList(
+ Arrays.asList("000015", "000013", "000011", "000009"),
+ Arrays.asList("000018", "000016", "000017", "000026", "000024", "000022",
+ "000020"),
+ Arrays.asList("000027", "000030", "000028", "000029", "000031", "000039",
+ "000037", "000035", "000033"),
+ Arrays.asList("000040", "000044", "000042", "000043", "000045", "000041",
+ "000046", "000054", "000052", "000050", "000048"),
+ Arrays.asList("000059", "000055", "000056", "000060", "000057", "000058")
+ );
+
+ private static final List<List<CompactionNode>> COMPACTION_NODES_BY_LEVEL =
+ SST_FILES_BY_LEVEL.stream()
+ .map(sstFiles ->
+ sstFiles.stream()
+ .map(
+ sstFile -> new CompactionNode(sstFile,
+ UUID.randomUUID().toString(),
+ 1000L,
+ Long.parseLong(sstFile.substring(0, 6))
+ ))
+ .collect(Collectors.toList()))
+ .collect(Collectors.toList());
+
+ /**
+ * Creates a backward compaction DAG from a list of level nodes.
+ * It assumes that at each level files get compacted to the half of number
+ * of files at the next level.
+ * e.g. if level-1 has 7 files and level-2 has 9 files, so first 4 files
+ * at level-2 are from compaction of level-1 and rests are new.
+ */
+ private static MutableGraph<CompactionNode> createBackwardDagFromLevelNodes(
+ int fromLevel,
+ int toLevel
+ ) {
+ MutableGraph<CompactionNode> dag = GraphBuilder.directed().build();
+
+ if (fromLevel == toLevel) {
+ COMPACTION_NODES_BY_LEVEL.get(fromLevel).forEach(dag::addNode);
+ return dag;
+ }
+
+ for (int level = fromLevel; level < toLevel; level++) {
+ List<CompactionNode> currentLevel = COMPACTION_NODES_BY_LEVEL.get(level);
+ List<CompactionNode> nextLevel = COMPACTION_NODES_BY_LEVEL.get(level +
1);
+
+ for (int i = 0; i < currentLevel.size(); i++) {
+ for (int j = 0; j < nextLevel.size(); j++) {
+ dag.addNode(currentLevel.get(i));
+ dag.addNode(nextLevel.get(j));
+
+ int child = nextLevel.size();
+ if (level < COMPACTION_NODES_BY_LEVEL.size() - 2) {
+ child /= 2;
+ }
+
+ if (j < child) {
+ dag.putEdge(currentLevel.get(i), nextLevel.get(j));
+ }
+ }
+ }
+ }
+
+ return dag;
+ }
+
+ /**
+ * Creates a forward compaction DAG from a list of level nodes.
+ * It assumes that at each level first half of the files are from the
+ * compaction of the previous level.
+ * e.g. if level-1 has 7 files and level-2 has 9 files, so first 4 files
+ * at level-2 are from compaction of level-1 and rests are new.
+ */
+ private static MutableGraph<CompactionNode> createForwardDagFromLevelNodes(
+ int fromLevel,
+ int toLevel
+ ) {
+ MutableGraph<CompactionNode> dag = GraphBuilder.directed().build();
+
+ if (fromLevel == toLevel) {
+ COMPACTION_NODES_BY_LEVEL.get(fromLevel).forEach(dag::addNode);
+ return dag;
+ }
+
+ dag = GraphBuilder.directed().build();
+ for (int level = fromLevel; level > toLevel; level--) {
+ List<CompactionNode> currentLevel = COMPACTION_NODES_BY_LEVEL.get(level);
+ List<CompactionNode> nextLevel = COMPACTION_NODES_BY_LEVEL.get(level -
1);
+
+ for (int i = 0; i < currentLevel.size(); i++) {
+ for (int j = 0; j < nextLevel.size(); j++) {
+ dag.addNode(currentLevel.get(i));
+ dag.addNode(nextLevel.get(j));
+
+ int parent = currentLevel.size();
+ if (level < COMPACTION_NODES_BY_LEVEL.size() - 1) {
+ parent /= 2;
+ }
+
+ if (i < parent) {
+ dag.putEdge(currentLevel.get(i), nextLevel.get(j));
+ }
+ }
+ }
+ }
+
+ return dag;
+ }
+
+ /**
+ * Test cases for pruneBackwardDag.
+ */
+ private static Stream<Arguments> pruneBackwardDagScenarios() {
+ Set<String> level0Files = new HashSet<>(SST_FILES_BY_LEVEL.get(0));
+ Set<String> level1Files = new HashSet<>(SST_FILES_BY_LEVEL.get(1));
+ Set<String> level2Files = new HashSet<>(SST_FILES_BY_LEVEL.get(2));
+ Set<String> level3Files = new HashSet<>(SST_FILES_BY_LEVEL.get(3));
+
+ level1Files.addAll(level0Files);
+ level2Files.addAll(level1Files);
+ level3Files.addAll(level2Files);
+
+ return Stream.of(
+ Arguments.of("Remove level 0 from backward DAG",
+ createBackwardDagFromLevelNodes(0, 4),
+ new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(0)),
+ createBackwardDagFromLevelNodes(1, 4),
+ level0Files
+ ),
+ Arguments.of("Remove level 1 from backward DAG",
+ createBackwardDagFromLevelNodes(0, 4),
+ new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(1)),
+ createBackwardDagFromLevelNodes(2, 4),
+ level1Files
+ ),
+ Arguments.of("Remove level 2 from backward DAG",
+ createBackwardDagFromLevelNodes(0, 4),
+ new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(2)),
+ createBackwardDagFromLevelNodes(3, 4),
+ level2Files
+ ),
+ Arguments.of("Remove level 3 from backward DAG",
+ createBackwardDagFromLevelNodes(0, 4),
+ new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(3)),
+ createBackwardDagFromLevelNodes(4, 4),
+ level3Files
+ )
+ );
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("pruneBackwardDagScenarios")
+ public void testPruneBackwardDag(String description,
+ MutableGraph<CompactionNode> originalDag,
+ Set<CompactionNode> levelToBeRemoved,
+ MutableGraph<CompactionNode> expectedDag,
+ Set<String> expectedFileNodesRemoved) {
+
+ RocksDBCheckpointDiffer differ =
+ new RocksDBCheckpointDiffer(null, null, null, 0L);
+ Set<String> actualFileNodesRemoved =
+ differ.pruneBackwardDag(originalDag, levelToBeRemoved);
+ Assertions.assertEquals(expectedDag, originalDag);
+ Assertions.assertEquals(actualFileNodesRemoved, expectedFileNodesRemoved);
+ }
+
+
+ /**
+ * Test cases for pruneBackwardDag.
+ */
+ private static Stream<Arguments> pruneForwardDagScenarios() {
+ Set<String> level0Files = new HashSet<>(SST_FILES_BY_LEVEL.get(0));
+ Set<String> level1Files = new HashSet<>(SST_FILES_BY_LEVEL.get(1));
+ Set<String> level2Files = new HashSet<>(SST_FILES_BY_LEVEL.get(2));
+ Set<String> level3Files = new HashSet<>(SST_FILES_BY_LEVEL.get(3));
+
+ level1Files.addAll(level0Files);
+ level2Files.addAll(level1Files);
+ level3Files.addAll(level2Files);
+
+ return Stream.of(
+ Arguments.of("Remove level 0 from forward DAG",
+ createForwardDagFromLevelNodes(4, 0),
+ new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(0)),
+ createForwardDagFromLevelNodes(4, 1),
+ level0Files
+ ),
+ Arguments.of("Remove level 1 from forward DAG",
+ createForwardDagFromLevelNodes(4, 0),
+ new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(1)),
+ createForwardDagFromLevelNodes(4, 2),
+ level1Files
+ ),
+ Arguments.of("Remove level 2 from forward DAG",
+ createForwardDagFromLevelNodes(4, 0),
+ new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(2)),
+ createForwardDagFromLevelNodes(4, 3),
+ level2Files
+ ),
+ Arguments.of("Remove level 3 from forward DAG",
+ createForwardDagFromLevelNodes(4, 0),
+ new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(3)),
+ createForwardDagFromLevelNodes(4, 4),
+ level3Files
+ )
+ );
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("pruneForwardDagScenarios")
+ public void testPruneForwardDag(String description,
+ MutableGraph<CompactionNode> originalDag,
+ Set<CompactionNode> levelToBeRemoved,
+ MutableGraph<CompactionNode> expectedDag,
+ Set<String> expectedFileNodesRemoved) {
+
+ RocksDBCheckpointDiffer differ =
+ new RocksDBCheckpointDiffer(null, null, null, 0L);
+ Set<String> actualFileNodesRemoved =
+ differ.pruneForwardDag(originalDag, levelToBeRemoved);
+ Assertions.assertEquals(expectedDag, originalDag);
+ Assertions.assertEquals(actualFileNodesRemoved, expectedFileNodesRemoved);
+ }
+
+ private static Stream<Arguments> compactionDagPruningScenarios() {
+ long currentTimeMillis = System.currentTimeMillis();
+
+ String compactionLogFile0 = "S 1000 snapshotId0 " +
+ (currentTimeMillis - MINUTES.toMillis(30)) + " \n";
+ String compactionLogFile1 = "C 000015,000013,000011,000009:000018,000016,"
+
+ "000017\n"
+ + "S 2000 snapshotId1 " +
+ (currentTimeMillis - MINUTES.toMillis(24)) + " \n";
+
+ String compactionLogFile2 = "C 000018,000016,000017,000026,000024,000022,"
+
+ "000020:000027,000030,000028,000031,000029\n"
+ + "S 3000 snapshotId2 " +
+ (currentTimeMillis - MINUTES.toMillis(18)) + " \n";
+
+ String compactionLogFile3 = "C 000027,000030,000028,000031,000029,000039,"
+
+ "000037,000035,000033:000040,000044,000042,000043,000046,000041," +
+ "000045\n"
+ + "S 3000 snapshotId3 " +
+ (currentTimeMillis - MINUTES.toMillis(12)) + " \n";
+
+ String compactionLogFile4 = "C 000040,000044,000042,000043,000046,000041,"
+
+ "000045,000054,000052,000050,000048:000059,000055,000056,000060," +
+ "000057,000058\n"
+ + "S 3000 snapshotId4 " +
+ (currentTimeMillis - MINUTES.toMillis(6)) + " \n";
+
+ String compactionLogFileWithoutSnapshot1 = "C 000015,000013,000011," +
+ "000009:000018,000016,000017\n" +
+ "C 000018,000016,000017,000026,000024,000022,000020:000027,000030," +
+ "000028,000031,000029\n";
+
+ String compactionLogFileWithoutSnapshot2 = "C 000027,000030,000028," +
+ "000031,000029,000039,000037,000035,000033:000040,000044,000042," +
+ "000043,000046,000041,000045\n";
+
+ String compactionLogFileWithoutSnapshot3 = "C 000040,000044,000042," +
+ "000043,000046,000041,000045,000054,000052,000050,000048:000059," +
+ "000055,000056,000060,000057,000058\n";
+
+ String compactionLogFileOnlyWithSnapshot1 =
+ "S 3000 snapshotIdWithoutCompaction1 " +
+ (currentTimeMillis - MINUTES.toMillis(18)) + " \n";
+
+ String compactionLogFileOnlyWithSnapshot2 =
+ "S 3000 snapshotIdWithoutCompaction2 " +
+ (currentTimeMillis - MINUTES.toMillis(15)) + " \n";
+
+ String compactionLogFileOnlyWithSnapshot3 =
+ "S 3000 snapshotIdWithoutCompaction3 " +
+ (currentTimeMillis - MINUTES.toMillis(12)) + " \n";
+
+ String compactionLogFileOnlyWithSnapshot4 =
+ "S 3000 snapshotIdWithoutCompaction4 " +
+ (currentTimeMillis - MINUTES.toMillis(9)) + " \n";
+
+ String compactionLogFileOnlyWithSnapshot5 =
+ "S 3000 snapshotIdWithoutCompaction5 " +
+ (currentTimeMillis - MINUTES.toMillis(6)) + " \n";
+
+ String compactionLogFileOnlyWithSnapshot6 =
+ "S 3000 snapshotIdWithoutCompaction6 " +
+ (currentTimeMillis - MINUTES.toMillis(3)) + " \n";
+
+ Set<String> expectedNodes = new HashSet<>(
+ Arrays.asList("000054", "000052", "000050", "000048", "000059",
+ "000055", "000056", "000060", "000057", "000058")
+ );
+
+ Set<String> expectedAllNodes = new HashSet<>(
+ Arrays.asList("000058", "000013", "000035", "000057", "000056",
+ "000011", "000033", "000055", "000018", "000017", "000039",
+ "000016", "000015", "000037", "000059", "000060", "000043",
+ "000020", "000042", "000041", "000040", "000024", "000046",
+ "000045", "000022", "000044", "000029", "000028", "000027",
+ "000026", "000048", "000009", "000050", "000054", "000031",
+ "000030", "000052")
+ );
+
+ return Stream.of(
+ Arguments.of("Each compaction log file has only one snapshot and one" +
+ " compaction statement except first log file.",
+ Arrays.asList(compactionLogFile0, compactionLogFile1,
+ compactionLogFile2, compactionLogFile3, compactionLogFile4),
+ expectedNodes,
+ 4
+ ),
+ Arguments.of("Compaction log doesn't have snapshot because OM" +
+ " restarted. Restart happened before snapshot to be deleted.",
+ Arrays.asList(compactionLogFile0,
+ compactionLogFileWithoutSnapshot1,
+ compactionLogFile3,
+ compactionLogFile4),
+ expectedNodes,
+ 3
+ ),
+ Arguments.of("Compaction log doesn't have snapshot because OM" +
+ " restarted. Restart happened after snapshot to be deleted.",
+ Arrays.asList(compactionLogFile0, compactionLogFile1,
+ compactionLogFile2, compactionLogFile3,
+ compactionLogFileWithoutSnapshot3,
+ compactionLogFileOnlyWithSnapshot4),
+ expectedNodes,
+ 4
+ ),
+ Arguments.of("No compaction happened in between two snapshots.",
+ Arrays.asList(compactionLogFile0, compactionLogFile1,
+ compactionLogFile2, compactionLogFile3,
+ compactionLogFileOnlyWithSnapshot1,
+ compactionLogFileOnlyWithSnapshot2, compactionLogFile4),
+ expectedNodes,
+ 6
+ ),
+ Arguments.of("No snapshot is taken and only one compaction log file,",
+ Collections.singletonList(compactionLogFileWithoutSnapshot1 +
+ compactionLogFileWithoutSnapshot2 +
+ compactionLogFileWithoutSnapshot3),
+ expectedAllNodes,
+ 0
+ ),
+ Arguments.of("No snapshot is taken but multiple compaction files" +
+ " because of OM restart.",
+ Arrays.asList(compactionLogFileWithoutSnapshot1,
+ compactionLogFileWithoutSnapshot2,
+ compactionLogFileWithoutSnapshot3),
+ expectedAllNodes,
+ 0
+ ),
+ Arguments.of("Only contains snapshots but no compaction.",
+ Arrays.asList(compactionLogFileOnlyWithSnapshot1,
+ compactionLogFileOnlyWithSnapshot2,
+ compactionLogFileOnlyWithSnapshot3,
+ compactionLogFileOnlyWithSnapshot4,
+ compactionLogFileOnlyWithSnapshot5,
+ compactionLogFileOnlyWithSnapshot6),
+ Collections.emptySet(),
+ 3
+ ),
+ Arguments.of("No file exists because compaction has not happened" +
+ " and snapshot is not taken.",
+ Collections.emptyList(),
+ Collections.emptySet(),
+ 0
+ )
+ );
+ }
+
+ /**
+ * End-to-end test for snapshot's compaction history pruning.
+ */
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("compactionDagPruningScenarios")
+ public void testPruneOlderSnapshotsWithCompactionHistory(
+ String description,
+ List<String> compactionLogs,
+ Set<String> expectedNodes,
+ int expectedNumberOfLogFilesDeleted
+ ) throws IOException {
+ String compactionLogDirName = "./test-compaction-log";
+ File compactionLogDir = new File(compactionLogDirName);
+ if (!compactionLogDir.exists() && !compactionLogDir.mkdirs()) {
+ fail("Error creating compaction log directory: " + compactionLogDirName);
+ }
+
+ String sstBackUpDirName = "./test-compaction-sst-backup";
+ File sstBackUpDir = new File(sstBackUpDirName);
+ if (!sstBackUpDir.exists() && !sstBackUpDir.mkdirs()) {
+ fail("Error creating SST backup directory: " + sstBackUpDirName);
+ }
+
+ List<File> filesCreated = new ArrayList<>();
+
+ for (int i = 0; i < compactionLogs.size(); i++) {
+ String compactionFileName =
+ compactionLogDirName + "/0000" + i + COMPACTION_LOG_FILE_NAME_SUFFIX;
+ File compactionFile = new File(compactionFileName);
+ Files.write(compactionFile.toPath(),
+ compactionLogs.get(i).getBytes(UTF_8));
+ filesCreated.add(compactionFile);
+ }
+
+ RocksDBCheckpointDiffer differ =
+ new RocksDBCheckpointDiffer(sstBackUpDirName,
+ compactionLogDirName,
+ null,
+ MINUTES.toMillis(10));
+
+ differ.loadAllCompactionLogs();
+
+ differ.pruneOlderSnapshotsWithCompactionHistory();
+
+ Set<String> actualNodes = differ.getForwardCompactionDAG().nodes().stream()
+ .map(CompactionNode::getFileName)
+ .collect(Collectors.toSet());
+
+ assertEquals(expectedNodes, actualNodes);
+
+ for (int i = 0; i < expectedNumberOfLogFilesDeleted; i++) {
+ File compactionFile = filesCreated.get(i);
+ assertFalse(compactionFile.exists());
+ }
+
+ for (int i = expectedNumberOfLogFilesDeleted; i < compactionLogs.size();
+ i++) {
+ File compactionFile = filesCreated.get(i);
+ assertTrue(compactionFile.exists());
+ }
+
+ deleteDirectory(compactionLogDir);
+ deleteDirectory(sstBackUpDir);
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index 45bdafee7d..bbf0939a2a 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -150,8 +150,9 @@ public final class OmSnapshotManager {
// Write snapshot generation (latest sequence number) to compaction log.
// This will be used for DAG reconstruction as snapshotGeneration.
- dbCpDiffer.appendSequenceNumberToCompactionLog(dbLatestSequenceNumber,
- snapshotInfo.getSnapshotID());
+ dbCpDiffer.appendSnapshotInfoToCompactionLog(dbLatestSequenceNumber,
+ snapshotInfo.getSnapshotID(),
+ snapshotInfo.getCreationTime());
// Set compaction log filename to the latest DB sequence number
// right after taking the RocksDB checkpoint for Ozone snapshot.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]