This is an automated email from the ASF dual-hosted git repository.
hemant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new cc773e83e8 HDDS-9311. Use compationLogTable to store compaction
information (#5317)
cc773e83e8 is described below
commit cc773e83e8545c83c36ca583ccec2f8c5d985f70
Author: Hemant Kumar <[email protected]>
AuthorDate: Wed Oct 4 09:20:31 2023 -0700
HDDS-9311. Use compationLogTable to store compaction information (#5317)
---
.../java/org/apache/hadoop/ozone/OzoneConsts.java | 5 +
.../org/apache/hadoop/hdds/utils/db/RDBStore.java | 14 +-
.../ozone/rocksdiff/RocksDBCheckpointDiffer.java | 596 +++++++---------
.../rocksdiff/TestRocksDBCheckpointDiffer.java | 781 ++++++++++++++-------
.../ozone/om/TestSnapshotBackgroundServices.java | 97 ++-
.../apache/hadoop/ozone/om/OmSnapshotManager.java | 22 -
6 files changed, 834 insertions(+), 681 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 27855d187d..e802de0666 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -606,4 +606,9 @@ public final class OzoneConsts {
*/
public static final String SNAPSHOT_INFO_TABLE = "snapshotInfoTable";
+ /**
+ * DB compaction log table name. Referenced in RDBStore.
+ */
+ public static final String COMPACTION_LOG_TABLE =
+ "compactionLogTable";
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index 7ccb01d79f..371e6abf5e 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -49,6 +49,7 @@ import org.rocksdb.TransactionLogIterator.BatchResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.ozone.OzoneConsts.COMPACTION_LOG_TABLE;
import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR;
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR;
@@ -150,10 +151,15 @@ public class RDBStore implements DBStore {
// Set CF handle in differ to be used in DB listener
rocksDBCheckpointDiffer.setSnapshotInfoTableCFHandle(
ssInfoTableCF.getHandle());
- // Finish the initialization of compaction DAG tracker by setting the
- // sequence number as current compaction log filename.
- rocksDBCheckpointDiffer.setCurrentCompactionLog(
- db.getLatestSequenceNumber());
+ // Set CF handle in differ to be store compaction log entry.
+ ColumnFamily compactionLogTableCF =
+ db.getColumnFamily(COMPACTION_LOG_TABLE);
+ Preconditions.checkNotNull(compactionLogTableCF,
+ "CompactionLogTable column family handle should not be null.");
+ rocksDBCheckpointDiffer.setCompactionLogTableCFHandle(
+ compactionLogTableCF.getHandle());
+ // Set activeRocksDB in differ to access compaction log CF.
+ rocksDBCheckpointDiffer.setActiveRocksDB(db.getManagedRocksDb().get());
// Load all previous compaction logs
rocksDBCheckpointDiffer.loadAllCompactionLogs();
}
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index d8fe2d50ab..ebf4473a54 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -30,15 +30,21 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
+import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import
org.apache.hadoop.hdds.protocol.proto.HddsProtos.CompactionLogEntryProto;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.Scheduler;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import org.apache.ozone.compaction.log.CompactionFileInfo;
+import org.apache.ozone.compaction.log.CompactionLogEntry;
import org.apache.ozone.rocksdb.util.RdbUtil;
import org.apache.ozone.graph.PrintableGraph;
import org.apache.ozone.graph.PrintableGraph.GraphType;
@@ -51,6 +57,7 @@ import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.SstFileReader;
+import org.rocksdb.SstFileReaderIterator;
import org.rocksdb.TableProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,7 +84,6 @@ import java.util.stream.Stream;
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.Arrays.asList;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED_DEFAULT;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL;
@@ -116,16 +122,6 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
private final String compactionLogDir;
- /**
- * Compaction log path for DB compaction history persistence.
- * This is the source of truth for in-memory SST DAG reconstruction upon
- * OM restarts.
- * <p>
- * Initialized to the latest sequence number on OM startup. The log also
rolls
- * over (gets appended to a new file) whenever an Ozone snapshot is taken.
- */
- private volatile String currentCompactionLogPath = null;
-
public static final String COMPACTION_LOG_FILE_NAME_SUFFIX = ".log";
/**
@@ -174,8 +170,8 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
/**
* Used during DAG reconstruction.
*/
- private long reconstructionSnapshotGeneration;
- private String reconstructionLastSnapshotID;
+ private long reconstructionSnapshotCreationTime;
+ private String reconstructionCompactionReason;
private final Scheduler scheduler;
private volatile boolean closed;
@@ -188,6 +184,9 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
private final String dagPruningServiceName = "CompactionDagPruningService";
private AtomicBoolean suspended;
+ private ColumnFamilyHandle compactionLogTableCFHandle;
+ private RocksDB activeRocksDB;
+
/**
* This is a package private constructor and should not be used other than
* testing. Caller should use RocksDBCheckpointDifferHolder#getInstance() to
@@ -303,36 +302,6 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
}
}
- /**
- * Set the current compaction log filename with a given RDB sequence number.
- * @param latestSequenceNum latest sequence number of RDB.
- */
- public void setCurrentCompactionLog(long latestSequenceNum) {
- String latestSequenceIdStr = String.valueOf(latestSequenceNum);
-
- if (latestSequenceIdStr.length() < LONG_MAX_STR_LEN) {
- // Pad zeroes to the left for ordered file listing when sorted
- // alphabetically.
- latestSequenceIdStr =
- StringUtils.leftPad(latestSequenceIdStr, LONG_MAX_STR_LEN, "0");
- }
-
- // Local temp variable for storing the new compaction log file path
- final String newCompactionLog = compactionLogDir + latestSequenceIdStr +
- COMPACTION_LOG_FILE_NAME_SUFFIX;
-
- File clFile = new File(newCompactionLog);
- if (clFile.exists()) {
- LOG.warn("Compaction log exists: {}. Will append", newCompactionLog);
- }
-
- this.currentCompactionLogPath = newCompactionLog;
-
- // Create empty file if it doesn't exist
- appendToCurrentCompactionLog("");
- }
-
-
@Override
public void close() throws Exception {
if (!closed) {
@@ -381,43 +350,6 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
DEBUG_LEVEL.add(level);
}
- /**
- * Append (then flush) to the current compaction log file path.
- * Note: This does NOT automatically append newline to the log.
- */
- private void appendToCurrentCompactionLog(String content) {
- if (currentCompactionLogPath == null) {
- LOG.error("Unable to append compaction log. "
- + "Compaction log path is not set. "
- + "Please check initialization.");
- throw new RuntimeException("Compaction log path not set");
- }
-
- synchronized (this) {
- try (BufferedWriter bw = Files.newBufferedWriter(
- Paths.get(currentCompactionLogPath),
- StandardOpenOption.CREATE, StandardOpenOption.APPEND)) {
- bw.write(content);
- bw.flush();
- } catch (IOException e) {
- throw new RuntimeException("Failed to append compaction log to " +
- currentCompactionLogPath, e);
- }
- }
- }
-
- /**
- * Append a sequence number to the compaction log (roughly) when an Ozone
- * snapshot (RDB checkpoint) is taken.
- */
- public void appendSnapshotInfoToCompactionLog(long sequenceNum,
- String snapshotID,
- long creationTime) {
- final String line = COMPACTION_LOG_SEQ_NUM_LINE_PREFIX + sequenceNum +
- SPACE_DELIMITER + snapshotID + SPACE_DELIMITER + creationTime + "\n";
- appendToCurrentCompactionLog(line);
- }
-
/**
* Takes {@link org.rocksdb.Options}.
*/
@@ -457,6 +389,26 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
this.snapshotInfoTableCFHandle = snapshotInfoTableCFHandle;
}
+ /**
+ * Set CompactionLogTable DB column family handle to access the table.
+ * @param compactionLogTableCFHandle ColumnFamilyHandle
+ */
+ public synchronized void setCompactionLogTableCFHandle(
+ ColumnFamilyHandle compactionLogTableCFHandle) {
+ Preconditions.checkNotNull(compactionLogTableCFHandle,
+ "Column family handle should not be null");
+ this.compactionLogTableCFHandle = compactionLogTableCFHandle;
+ }
+
+ /**
+ * Set activeRocksDB to access CompactionLogTable.
+ * @param activeRocksDB RocksDB
+ */
+ public synchronized void setActiveRocksDB(RocksDB activeRocksDB) {
+ Preconditions.checkNotNull(activeRocksDB, "RocksDB should not be null.");
+ this.activeRocksDB = activeRocksDB;
+ }
+
/**
* Helper method to check whether the SnapshotInfoTable column family is
empty
* in a given DB instance.
@@ -548,49 +500,25 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
return;
}
- final StringBuilder sb = new StringBuilder();
+ long trxId = db.getLatestSequenceNumber();
+
+ CompactionLogEntry.Builder builder;
+ try (ManagedOptions options = new ManagedOptions();
+ ManagedReadOptions readOptions = new ManagedReadOptions()) {
+ builder = new CompactionLogEntry.Builder(trxId,
+ System.currentTimeMillis(),
+ toFileInfoList(compactionJobInfo.inputFiles(), options,
+ readOptions),
+ toFileInfoList(compactionJobInfo.outputFiles(), options,
+ readOptions));
+ }
if (LOG.isDebugEnabled()) {
- // Print compaction reason for this entry in the log file
- // e.g. kLevelL0FilesNum / kLevelMaxLevelSize.
- sb.append(COMPACTION_LOG_COMMENT_LINE_PREFIX)
- .append(compactionJobInfo.compactionReason())
- .append('\n');
+ builder = builder.setCompactionReason(
+ compactionJobInfo.compactionReason().toString());
}
- // Mark the beginning of a compaction log
- sb.append(COMPACTION_LOG_ENTRY_LINE_PREFIX);
- sb.append(db.getLatestSequenceNumber());
- sb.append(SPACE_DELIMITER);
-
- // Trim DB path, only keep the SST file name
- final int filenameOffset =
- compactionJobInfo.inputFiles().get(0).lastIndexOf("/") + 1;
-
- // Append the list of input files
- final List<String> inputFiles = compactionJobInfo.inputFiles();
- // Trim the file path, leave only the SST file name without extension
- inputFiles.replaceAll(s -> s.substring(
- filenameOffset, s.length() - SST_FILE_EXTENSION_LENGTH));
- final String inputFilesJoined =
- String.join(COMPACTION_LOG_ENTRY_FILE_DELIMITER, inputFiles);
- sb.append(inputFilesJoined);
-
- // Insert delimiter between input files and output files
- sb.append(COMPACTION_LOG_ENTRY_INPUT_OUTPUT_FILES_DELIMITER);
-
- // Append the list of output files
- final List<String> outputFiles = compactionJobInfo.outputFiles();
- outputFiles.replaceAll(s -> s.substring(
- filenameOffset, s.length() - SST_FILE_EXTENSION_LENGTH));
- final String outputFilesJoined =
- String.join(COMPACTION_LOG_ENTRY_FILE_DELIMITER, outputFiles);
- sb.append(outputFilesJoined);
-
- // End of line
- sb.append('\n');
-
- String content = sb.toString();
+ CompactionLogEntry compactionLogEntry = builder.build();
synchronized (this) {
if (closed) {
@@ -605,19 +533,49 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
waitForTarballCreation();
- // Write input and output file names to compaction log
- appendToCurrentCompactionLog(content);
+
+ // Add the compaction log entry to Compaction log table.
+ addToCompactionLogTable(compactionLogEntry);
// Populate the DAG
// TODO: [SNAPSHOT] Once SnapshotChainManager is put into use,
// set snapshotID to snapshotChainManager.getLatestGlobalSnapshot()
- populateCompactionDAG(inputFiles, outputFiles, null,
+ populateCompactionDAG(compactionLogEntry.getInputFileInfoList(),
+ compactionLogEntry.getOutputFileInfoList(), null,
db.getLatestSequenceNumber());
}
}
};
}
+ @VisibleForTesting
+ void addToCompactionLogTable(CompactionLogEntry compactionLogEntry) {
+ String dbSequenceIdStr =
+ String.valueOf(compactionLogEntry.getDbSequenceNumber());
+
+ if (dbSequenceIdStr.length() < LONG_MAX_STR_LEN) {
+ // Pad zeroes to the left to make sure it is lexicographic ordering.
+ dbSequenceIdStr = org.apache.commons.lang3.StringUtils.leftPad(
+ dbSequenceIdStr, LONG_MAX_STR_LEN, "0");
+ }
+
+ // Key in the transactionId-currentTime
+ // Just trxId can't be used because multiple compaction might be
+ // running, and it is possible no new entry was added to DB.
+ // Adding current time to transactionId eliminates key collision.
+ String keyString = dbSequenceIdStr + "-" +
+ compactionLogEntry.getCompactionTime();
+
+ byte[] key = keyString.getBytes(UTF_8);
+ byte[] value = compactionLogEntry.getProtobuf().toByteArray();
+ try {
+ activeRocksDB.put(compactionLogTableCFHandle, key, value);
+ } catch (RocksDBException exception) {
+ // TODO: Revisit exception handling before merging the PR.
+ throw new RuntimeException(exception);
+ }
+ }
+
/**
* Check if there is any in_progress tarball creation request and wait till
* all tarball creation finish, and it gets notified.
@@ -740,20 +698,20 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
}
/**
- * Process each line of compaction log text file input and populate the DAG.
+ * Process log line of compaction log text file input and populate the DAG.
+ * It also adds the compaction log entry to compaction log table.
*/
void processCompactionLogLine(String line) {
LOG.debug("Processing line: {}", line);
synchronized (this) {
- if (line.startsWith("#")) {
- // Skip comments
- LOG.debug("Comment line, skipped");
+ if (line.startsWith(COMPACTION_LOG_COMMENT_LINE_PREFIX)) {
+ reconstructionCompactionReason =
+ line.substring(COMPACTION_LOG_COMMENT_LINE_PREFIX.length());
} else if (line.startsWith(COMPACTION_LOG_SEQ_NUM_LINE_PREFIX)) {
- SnapshotLogInfo snapshotLogInfo = getSnapshotLogInfo(line);
- reconstructionSnapshotGeneration =
snapshotLogInfo.snapshotGenerationId;
- reconstructionLastSnapshotID = snapshotLogInfo.snapshotId;
+ reconstructionSnapshotCreationTime =
+ getSnapshotCreationTimeFromLogLine(line);
} else if (line.startsWith(COMPACTION_LOG_ENTRY_LINE_PREFIX)) {
// Compaction log entry is like following:
// C sequence_number input_files:output_files
@@ -764,6 +722,7 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
return;
}
+ String dbSequenceNumber = lineSpilt[1];
String[] io = lineSpilt[2]
.split(COMPACTION_LOG_ENTRY_INPUT_OUTPUT_FILES_DELIMITER);
@@ -778,18 +737,21 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
String[] inputFiles = io[0].split(COMPACTION_LOG_ENTRY_FILE_DELIMITER);
String[] outputFiles =
io[1].split(COMPACTION_LOG_ENTRY_FILE_DELIMITER);
- populateCompactionDAG(asList(inputFiles), asList(outputFiles),
- reconstructionLastSnapshotID, reconstructionSnapshotGeneration);
+ addFileInfoToCompactionLogTable(Long.parseLong(dbSequenceNumber),
+ reconstructionSnapshotCreationTime, inputFiles, outputFiles,
+ reconstructionCompactionReason);
} else {
LOG.error("Invalid line in compaction log: {}", line);
}
}
}
+
/**
- * Helper to read compaction log to the internal DAG.
+ * Helper to read compaction log file to the internal DAG and compaction log
+ * table.
*/
- private void readCompactionLogToDAG(String currCompactionLogPath) {
+ private void readCompactionLogFile(String currCompactionLogPath) {
LOG.debug("Loading compaction log: {}", currCompactionLogPath);
try (Stream<String> logLineStream =
Files.lines(Paths.get(currCompactionLogPath), UTF_8)) {
@@ -799,24 +761,20 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
}
}
- /**
- * Load existing compaction log files to the in-memory DAG.
- * This only needs to be done once during OM startup.
- */
- public void loadAllCompactionLogs() {
+ public void addEntriesFromLogFilesToDagAndCompactionLogTable() {
synchronized (this) {
- if (compactionLogDir == null) {
- throw new RuntimeException("Compaction log directory must be set " +
- "first");
- }
- reconstructionSnapshotGeneration = 0L;
+ reconstructionSnapshotCreationTime = 0L;
+ reconstructionCompactionReason = null;
try {
try (Stream<Path> pathStream = Files.list(Paths.get(compactionLogDir))
.filter(e -> e.toString().toLowerCase()
.endsWith(COMPACTION_LOG_FILE_NAME_SUFFIX))
.sorted()) {
for (Path logPath : pathStream.collect(Collectors.toList())) {
- readCompactionLogToDAG(logPath.toString());
+ readCompactionLogFile(logPath.toString());
+ // Delete the file once entries are added to compaction table
+ // so that on next restart, only compaction log table is used.
+ Files.delete(logPath);
}
}
} catch (IOException e) {
@@ -826,6 +784,43 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
}
}
+ /**
+ * Load existing compaction log from table to the in-memory DAG.
+ * This only needs to be done once during OM startup.
+ */
+ public void loadAllCompactionLogs() {
+ synchronized (this) {
+ preconditionChecksForLoadAllCompactionLogs();
+ addEntriesFromLogFilesToDagAndCompactionLogTable();
+ try (ManagedRocksIterator managedRocksIterator = new
ManagedRocksIterator(
+ activeRocksDB.newIterator(compactionLogTableCFHandle))) {
+ managedRocksIterator.get().seekToFirst();
+ while (managedRocksIterator.get().isValid()) {
+ byte[] value = managedRocksIterator.get().value();
+ CompactionLogEntry compactionLogEntry =
+ CompactionLogEntry.getFromProtobuf(
+ CompactionLogEntryProto.parseFrom(value));
+ populateCompactionDAG(compactionLogEntry.getInputFileInfoList(),
+ compactionLogEntry.getOutputFileInfoList(),
+ null,
+ compactionLogEntry.getDbSequenceNumber());
+ managedRocksIterator.get().next();
+ }
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private void preconditionChecksForLoadAllCompactionLogs() {
+ Preconditions.checkNotNull(compactionLogDir,
+ "Compaction log directory must be set.");
+ Preconditions.checkNotNull(compactionLogTableCFHandle,
+ "compactionLogTableCFHandle must be set before calling " +
+ "loadAllCompactionLogs.");
+ Preconditions.checkNotNull(activeRocksDB,
+ "activeRocksDB must be set before calling loadAllCompactionLogs.");
+ }
/**
* Helper function that prepends SST file name with SST backup directory path
* (or DB checkpoint path if compaction hasn't happened yet as SST files
won't
@@ -1096,12 +1091,12 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
}
@VisibleForTesting
- MutableGraph<CompactionNode> getForwardCompactionDAG() {
+ public MutableGraph<CompactionNode> getForwardCompactionDAG() {
return forwardCompactionDAG;
}
@VisibleForTesting
- MutableGraph<CompactionNode> getBackwardCompactionDAG() {
+ public MutableGraph<CompactionNode> getBackwardCompactionDAG() {
return backwardCompactionDAG;
}
@@ -1135,20 +1130,24 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
* arbitrary String as long as it helps debugging.
* @param seqNum DB transaction sequence number.
*/
- private void populateCompactionDAG(List<String> inputFiles,
- List<String> outputFiles, String snapshotId, long seqNum) {
+ private void populateCompactionDAG(List<CompactionFileInfo> inputFiles,
+ List<CompactionFileInfo> outputFiles,
+ String snapshotId,
+ long seqNum) {
if (LOG.isDebugEnabled()) {
LOG.debug("Input files: {} -> Output files: {}", inputFiles,
outputFiles);
}
- for (String outfile : outputFiles) {
+ for (CompactionFileInfo outfile : outputFiles) {
final CompactionNode outfileNode = compactionNodeMap.computeIfAbsent(
- outfile, file -> addNodeToDAG(file, snapshotId, seqNum));
+ outfile.getFileName(),
+ file -> addNodeToDAG(file, snapshotId, seqNum));
- for (String infile : inputFiles) {
+ for (CompactionFileInfo infile : inputFiles) {
final CompactionNode infileNode = compactionNodeMap.computeIfAbsent(
- infile, file -> addNodeToDAG(file, snapshotId, seqNum));
+ infile.getFileName(),
+ file -> addNodeToDAG(file, snapshotId, seqNum));
// Draw the edges
if (!outfileNode.getFileName().equals(infileNode.getFileName())) {
forwardCompactionDAG.putEdge(outfileNode, infileNode);
@@ -1156,7 +1155,30 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
}
}
}
+ }
+ private void addFileInfoToCompactionLogTable(
+ long dbSequenceNumber,
+ long creationTime,
+ String[] inputFiles,
+ String[] outputFiles,
+ String compactionReason
+ ) {
+ List<CompactionFileInfo> inputFileInfoList = Arrays.stream(inputFiles)
+ .map(inputFile -> new CompactionFileInfo.Builder(inputFile).build())
+ .collect(Collectors.toList());
+ List<CompactionFileInfo> outputFileInfoList = Arrays.stream(outputFiles)
+ .map(outputFile -> new CompactionFileInfo.Builder(outputFile).build())
+ .collect(Collectors.toList());
+
+ CompactionLogEntry.Builder builder =
+ new CompactionLogEntry.Builder(dbSequenceNumber, creationTime,
+ inputFileInfoList, outputFileInfoList);
+ if (compactionReason != null) {
+ builder.setCompactionReason(compactionReason);
+ }
+
+ addToCompactionLogTable(builder.build());
}
/**
@@ -1169,11 +1191,10 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
if (!shouldRun()) {
return;
}
-
- List<Path> olderSnapshotsLogFilePaths =
- getOlderSnapshotsCompactionLogFilePaths();
- List<String> lastCompactionSstFiles =
- getLastCompactionSstFiles(olderSnapshotsLogFilePaths);
+ Pair<Set<String>, List<byte[]>> fileNodeToKeyPair =
+ getOlderFileNodes();
+ Set<String> lastCompactionSstFiles = fileNodeToKeyPair.getLeft();
+ List<byte[]> keysToRemove = fileNodeToKeyPair.getRight();
Set<String> sstFileNodesRemoved =
pruneSstFileNodesFromDag(lastCompactionSstFiles);
@@ -1185,85 +1206,72 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) {
removeSstFiles(sstFileNodesRemoved);
- deleteOlderSnapshotsCompactionFiles(olderSnapshotsLogFilePaths);
+ removeKeyFromCompactionLogTable(keysToRemove);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
- /**
- * Deletes the SST files from the backup directory if exists.
- */
- private void removeSstFiles(Set<String> sstFileNodes) {
- for (String sstFileNode: sstFileNodes) {
- File file =
- new File(sstBackupDir + "/" + sstFileNode + SST_FILE_EXTENSION);
- try {
- Files.deleteIfExists(file.toPath());
- } catch (IOException exception) {
- LOG.warn("Failed to delete SST file: " + sstFileNode, exception);
- }
- }
- }
/**
- * Returns the list of compaction log files which are older than allowed
- * max time in the compaction DAG.
+ * Returns the list of input files from the compaction entries which are
+ * older than the maximum allowed in the compaction DAG.
*/
- private List<Path> getOlderSnapshotsCompactionLogFilePaths() {
+ private synchronized Pair<Set<String>, List<byte[]>> getOlderFileNodes() {
long compactionLogPruneStartTime = System.currentTimeMillis();
+ Set<String> compactionNodes = new HashSet<>();
+ List<byte[]> keysToRemove = new ArrayList<>();
+
+ try (ManagedRocksIterator managedRocksIterator = new ManagedRocksIterator(
+ activeRocksDB.newIterator(compactionLogTableCFHandle))) {
+ managedRocksIterator.get().seekToFirst();
+ while (managedRocksIterator.get().isValid()) {
+ CompactionLogEntry compactionLogEntry = CompactionLogEntry
+ .getFromProtobuf(CompactionLogEntryProto
+ .parseFrom(managedRocksIterator.get().value()));
+
+ if (compactionLogPruneStartTime -
+ compactionLogEntry.getCompactionTime() < maxAllowedTimeInDag) {
+ break;
+ }
- List<Path> compactionFiles =
- listCompactionLogFileFromCompactionLogDirectory();
-
- int index = compactionFiles.size() - 1;
- for (; index >= 0; index--) {
- Path compactionLogPath = compactionFiles.get(index);
- SnapshotLogInfo snapshotLogInfo =
- getSnapshotInfoFromLog(compactionLogPath);
-
- if (snapshotLogInfo == null) {
- continue;
- }
+ compactionLogEntry.getInputFileInfoList()
+ .forEach(inputFileInfo ->
+ compactionNodes.add(inputFileInfo.getFileName()));
+ keysToRemove.add(managedRocksIterator.get().key());
+ managedRocksIterator.get().next();
- if (compactionLogPruneStartTime - snapshotLogInfo.snapshotCreatedAt >
- maxAllowedTimeInDag) {
- break;
}
+ } catch (InvalidProtocolBufferException exception) {
+ // TODO: Handle this properly before merging the PR.
+ throw new RuntimeException(exception);
}
+ return Pair.of(compactionNodes, keysToRemove);
+ }
- if (index >= 0) {
- return compactionFiles.subList(0, index + 1);
- } else {
- return Collections.emptyList();
+ private synchronized void removeKeyFromCompactionLogTable(
+ List<byte[]> keysToRemove) {
+ try {
+ for (byte[] key: keysToRemove) {
+ activeRocksDB.delete(compactionLogTableCFHandle, key);
+ }
+ } catch (RocksDBException exception) {
+ // TODO Handle exception properly before merging the PR.
+ throw new RuntimeException(exception);
}
}
/**
- * Returns the list of compaction log file path from compaction log
directory.
+ * Deletes the SST files from the backup directory if exists.
*/
- private List<Path> listCompactionLogFileFromCompactionLogDirectory() {
- try (Stream<Path> pathStream = Files.list(Paths.get(compactionLogDir))
- .filter(e -> e.toString().toLowerCase()
- .endsWith(COMPACTION_LOG_FILE_NAME_SUFFIX))
- .sorted()) {
- return pathStream.collect(Collectors.toList());
- } catch (IOException e) {
- throw new RuntimeException("Error listing compaction log dir " +
- compactionLogDir, e);
- }
- }
-
- public void deleteOlderSnapshotsCompactionFiles(
- List<Path> olderSnapshotsLogFilePaths) {
-
- for (int i = 0; i < olderSnapshotsLogFilePaths.size(); i++) {
- Path olderSnapshotsLogFilePath = olderSnapshotsLogFilePaths.get(i);
+ private void removeSstFiles(Set<String> sstFileNodes) {
+ for (String sstFileNode: sstFileNodes) {
+ File file =
+ new File(sstBackupDir + "/" + sstFileNode + SST_FILE_EXTENSION);
try {
- Files.deleteIfExists(olderSnapshotsLogFilePath);
+ Files.deleteIfExists(file.toPath());
} catch (IOException exception) {
- LOG.error("Failed to deleted SST file: {}", olderSnapshotsLogFilePath,
- exception);
+ LOG.warn("Failed to delete SST file: " + sstFileNode, exception);
}
}
}
@@ -1272,7 +1280,7 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
* Prunes forward and backward DAGs when oldest snapshot with compaction
* history gets deleted.
*/
- public Set<String> pruneSstFileNodesFromDag(List<String> sstFileNodes) {
+ public Set<String> pruneSstFileNodesFromDag(Set<String> sstFileNodes) {
Set<CompactionNode> startNodes = new HashSet<>();
for (String sstFileNode : sstFileNodes) {
CompactionNode infileNode = compactionNodeMap.get(sstFileNode);
@@ -1353,29 +1361,7 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
return removedFiles;
}
- private SnapshotLogInfo getSnapshotInfoFromLog(Path compactionLogFile) {
- AtomicReference<SnapshotLogInfo> snapshotLogInfo =
- new AtomicReference<>();
- try (Stream<String> logStream = Files.lines(compactionLogFile, UTF_8)) {
- logStream.forEach(logLine -> {
- if (!logLine.startsWith(COMPACTION_LOG_SEQ_NUM_LINE_PREFIX)) {
- return;
- }
-
- snapshotLogInfo.set(getSnapshotLogInfo(logLine));
- });
- } catch (IOException exception) {
- throw new RuntimeException("Failed to read compaction log file: " +
- compactionLogFile, exception);
- }
-
- return snapshotLogInfo.get();
- }
-
- /**
- * Converts a snapshot compaction log line to SnapshotLogInfo.
- */
- private SnapshotLogInfo getSnapshotLogInfo(String logLine) {
+ private long getSnapshotCreationTimeFromLogLine(String logLine) {
// Remove `S ` from the line.
String line =
logLine.substring(COMPACTION_LOG_SEQ_NUM_LINE_PREFIX.length());
@@ -1384,74 +1370,7 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
Preconditions.checkArgument(splits.length == 3,
"Snapshot info log statement has more than expected parameters.");
- return new SnapshotLogInfo(Long.parseLong(splits[0]),
- splits[1],
- Long.parseLong(splits[2]));
- }
-
- /**
- * Returns the list of SST files got compacted in the last compaction from
- * the provided list of compaction log files.
- * We can't simply use last file from the list because it is possible that
- * no compaction happened between the last snapshot and previous to that.
- * Hence, we go over the list in reverse order and return the SST files from
- * first the compaction happened in the reverse list.
- * If no compaction happen at all, it returns empty list.
- */
- private List<String> getLastCompactionSstFiles(
- List<Path> compactionLogFiles
- ) {
-
- if (compactionLogFiles.isEmpty()) {
- return Collections.emptyList();
- }
- compactionLogFiles = new ArrayList<>(compactionLogFiles);
- Collections.reverse(compactionLogFiles);
-
- for (Path compactionLogFile: compactionLogFiles) {
- List<String> sstFiles = getLastCompactionSstFiles(compactionLogFile);
- if (!sstFiles.isEmpty()) {
- return sstFiles;
- }
- }
-
- return Collections.emptyList();
- }
-
- private List<String> getLastCompactionSstFiles(Path compactionLogFile) {
-
- AtomicReference<String> sstFiles = new AtomicReference<>();
-
- try (Stream<String> logStream = Files.lines(compactionLogFile, UTF_8)) {
- logStream.forEach(logLine -> {
- if (!logLine.startsWith(COMPACTION_LOG_ENTRY_LINE_PREFIX)) {
- return;
- }
- sstFiles.set(logLine);
- });
- } catch (IOException exception) {
- throw new RuntimeException("Failed to read file: " + compactionLogFile,
- exception);
- }
-
- String lastCompactionLogEntry = sstFiles.get();
-
- if (StringUtils.isEmpty(lastCompactionLogEntry)) {
- return Collections.emptyList();
- }
-
- // Trim the beginning
- lastCompactionLogEntry = lastCompactionLogEntry
- .substring(COMPACTION_LOG_ENTRY_LINE_PREFIX.length());
-
- String[] io = lastCompactionLogEntry
- .split(COMPACTION_LOG_ENTRY_INPUT_OUTPUT_FILES_DELIMITER);
-
- assert (io.length == 2);
-
- String[] outputFiles = io[1].split(COMPACTION_LOG_ENTRY_FILE_DELIMITER);
-
- return asList(outputFiles);
+ return Long.parseLong(splits[2]);
}
public String getSSTBackupDir() {
@@ -1462,20 +1381,6 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
return compactionLogDir;
}
- private static final class SnapshotLogInfo {
- private final long snapshotGenerationId;
- private final String snapshotId;
- private final long snapshotCreatedAt;
-
- private SnapshotLogInfo(long snapshotGenerationId,
- String snapshotId,
- long snapshotCreatedAt) {
- this.snapshotGenerationId = snapshotGenerationId;
- this.snapshotId = snapshotId;
- this.snapshotCreatedAt = snapshotCreatedAt;
- }
- }
-
/**
* Defines the task that removes SST files from backup directory which are
* not needed to generate snapshot diff using compaction DAG to clean
@@ -1537,11 +1442,6 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
return LOG;
}
- @VisibleForTesting
- public String getCurrentCompactionLogPath() {
- return currentCompactionLogPath;
- }
-
@VisibleForTesting
public ConcurrentHashMap<String, CompactionNode> getCompactionNodeMap() {
return compactionNodeMap;
@@ -1610,4 +1510,44 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
graph.generateImage(filePath);
}
+
+ private List<CompactionFileInfo> toFileInfoList(
+ List<String> sstFiles,
+ ManagedOptions options,
+ ManagedReadOptions readOptions
+ ) {
+ if (CollectionUtils.isEmpty(sstFiles)) {
+ return Collections.emptyList();
+ }
+
+ final int fileNameOffset = sstFiles.get(0).lastIndexOf("/") + 1;
+ List<CompactionFileInfo> response = new ArrayList<>();
+
+ for (String sstFile : sstFiles) {
+ String fileName = sstFile.substring(fileNameOffset,
+ sstFile.length() - SST_FILE_EXTENSION_LENGTH);
+ SstFileReader fileReader = new SstFileReader(options);
+ try {
+ fileReader.open(sstFile);
+ String columnFamily = StringUtils.bytes2String(
+ fileReader.getTableProperties().getColumnFamilyName());
+ SstFileReaderIterator iterator = fileReader.newIterator(readOptions);
+ iterator.seekToFirst();
+ String startKey = StringUtils.bytes2String(iterator.key());
+ iterator.seekToLast();
+ String endKey = StringUtils.bytes2String(iterator.key());
+
+ CompactionFileInfo fileInfo = new CompactionFileInfo.Builder(fileName)
+ .setStartRange(startKey)
+ .setEndRange(endKey)
+ .setColumnFamily(columnFamily)
+ .build();
+ response.add(fileInfo);
+ } catch (RocksDBException rocksDBException) {
+ throw new RuntimeException("Failed to read SST file: " + sstFile,
+ rocksDBException);
+ }
+ }
+ return response;
+ }
}
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
index d7022dde89..18118b142e 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
@@ -21,6 +21,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.MINUTES;
+import com.google.common.collect.ImmutableSet;
import com.google.common.graph.GraphBuilder;
import java.io.File;
import java.io.FileOutputStream;
@@ -51,8 +52,12 @@ import com.google.common.graph.MutableGraph;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
+import org.apache.ozone.compaction.log.CompactionFileInfo;
+import org.apache.ozone.compaction.log.CompactionLogEntry;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.NodeComparator;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterEach;
@@ -82,6 +87,7 @@ import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTI
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED_DEFAULT;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_DAG_DAEMON_RUN_INTERVAL_DEFAULT;
+import static org.apache.hadoop.util.Time.now;
import static
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.COMPACTION_LOG_FILE_NAME_SUFFIX;
import static
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_DAG_LIVE_NODES;
import static
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_READ_ALL_DB_KEYS;
@@ -121,9 +127,13 @@ public class TestRocksDBCheckpointDiffer {
private File sstBackUpDir;
private ConfigurationSource config;
private ExecutorService executorService = Executors.newCachedThreadPool();
+ private RocksDBCheckpointDiffer rocksDBCheckpointDiffer;
+ private RocksDB activeRocksDB;
+ private ColumnFamilyHandle keyTableCFHandle;
+ private ColumnFamilyHandle compactionLogTableCFHandle;
@BeforeEach
- public void init() {
+ public void init() throws RocksDBException {
// Checkpoint differ log level. Set to DEBUG for verbose output
GenericTestUtils.setLogLevel(RocksDBCheckpointDiffer.getLog(), Level.INFO);
// Test class log level. Set to DEBUG for verbose output
@@ -152,6 +162,30 @@ public class TestRocksDBCheckpointDiffer {
OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL,
OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_DAG_DAEMON_RUN_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS)).thenReturn(0L);
+
+ rocksDBCheckpointDiffer = new RocksDBCheckpointDiffer(metadataDirName,
+ sstBackUpDirName,
+ compactionLogDirName,
+ activeDbDirName,
+ config);
+
+ ColumnFamilyOptions cfOpts = new ColumnFamilyOptions()
+ .optimizeUniversalStyleCompaction();
+ List<ColumnFamilyDescriptor> cfDescriptors = getCFDescriptorList(cfOpts);
+ List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
+ DBOptions dbOptions = new DBOptions()
+ .setCreateIfMissing(true)
+ .setCreateMissingColumnFamilies(true);
+
+ rocksDBCheckpointDiffer.setRocksDBForCompactionTracking(dbOptions);
+ activeRocksDB = RocksDB.open(dbOptions, activeDbDirName, cfDescriptors,
+ cfHandles);
+ keyTableCFHandle = cfHandles.get(1);
+ compactionLogTableCFHandle = cfHandles.get(4);
+
+ rocksDBCheckpointDiffer.setCompactionLogTableCFHandle(cfHandles.get(4));
+ rocksDBCheckpointDiffer.setActiveRocksDB(activeRocksDB);
+ rocksDBCheckpointDiffer.loadAllCompactionLogs();
}
private void createDir(File file, String filePath) {
@@ -168,6 +202,10 @@ public class TestRocksDBCheckpointDiffer {
@AfterEach
public void cleanUp() {
+ IOUtils.closeQuietly(rocksDBCheckpointDiffer);
+ IOUtils.closeQuietly(keyTableCFHandle);
+ IOUtils.closeQuietly(compactionLogTableCFHandle);
+ IOUtils.closeQuietly(activeRocksDB);
deleteDirectory(compactionLogDir);
deleteDirectory(sstBackUpDir);
deleteDirectory(metadataDirDir);
@@ -177,8 +215,77 @@ public class TestRocksDBCheckpointDiffer {
/**
* Test cases for testGetSSTDiffListWithoutDB.
*/
+ @SuppressWarnings("methodlength")
private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
+ String compactionLog =
+ // Snapshot 0
+ "S 1000 df6410c7-151b-4e90-870e-5ef12875acd5 " + now() + " \n"
+ // Additional "compaction" to trigger and test early exit condition
+ + "C 1291 000001,000002:000062\n"
+ // Snapshot 1
+ + "S 3008 ef6410c7-151b-4e90-870e-5ef12875acd5 " + now() + " \n"
+ // Regular compaction
+ + "C 4023 000068,000062:000069\n"
+ // Trivial move
+ + "C 5647
000071,000064,000060,000052:000071,000064,000060,000052\n"
+ + "C 7658 000073,000066:000074\n"
+ + "C 7872 000082,000076,000069:000083\n"
+ + "C 9001 000087,000080,000074:000088\n"
+ // Deletion?
+ + "C 12755 000093,000090,000083:\n"
+ // Snapshot 2
+ + "S 14980 e7ad72f8-52df-4430-93f6-0ee91d4a47fd " + now() + "\n"
+ + "C 16192 000098,000096,000085,000078,000071,000064,000060,000052"
+ + ":000099\n"
+ + "C 16762 000105,000095,000088:000107\n"
+ // Snapshot 3
+ + "S 17975 4f084f6e-ed3d-4780-8362-f832303309ea " + now() + "\n";
+
+ List<CompactionLogEntry> compactionLogEntries = Arrays.asList(
+ // Additional "compaction" to trigger and test early exit condition
+ createCompactionEntry(1291,
+ now(),
+ Arrays.asList("000001", "000002"),
+ Collections.singletonList("000062")),
+ // Regular compaction
+ createCompactionEntry(4023,
+ now(),
+ Arrays.asList("000068", "000062"),
+ Collections.singletonList("000069")),
+ // Trivial move
+ createCompactionEntry(5547,
+ now(),
+ Arrays.asList("000071", "000064", "000060", "000052"),
+ Arrays.asList("000071", "000064", "000060", "000062")),
+ createCompactionEntry(5647,
+ now(),
+ Arrays.asList("000073", "000066"),
+ Collections.singletonList("000074")),
+ createCompactionEntry(7872,
+ now(),
+ Arrays.asList("000082", "000076", "000069"),
+ Collections.singletonList("000083")),
+ createCompactionEntry(9001,
+ now(),
+ Arrays.asList("000087", "000080", "000074"),
+ Collections.singletonList("000088")),
+ // Deletion
+ createCompactionEntry(12755,
+ now(),
+ Arrays.asList("000093", "000090", "000083"),
+ Collections.emptyList()),
+ createCompactionEntry(16192,
+ now(),
+ Arrays.asList("000098", "000096", "000085", "000078", "000071",
+ "000064", "000060", "000052"),
+ Collections.singletonList("000099")),
+ createCompactionEntry(16762,
+ now(),
+ Arrays.asList("000105", "000095", "000088"),
+ Collections.singletonList("000107"))
+ );
+
DifferSnapshotInfo snapshotInfo1 = new DifferSnapshotInfo(
"/path/to/dbcp1", UUID.randomUUID(), 3008L, null, null);
DifferSnapshotInfo snapshotInfo2 = new DifferSnapshotInfo(
@@ -188,72 +295,136 @@ public class TestRocksDBCheckpointDiffer {
DifferSnapshotInfo snapshotInfo4 = new DifferSnapshotInfo(
"/path/to/dbcp4", UUID.randomUUID(), 18000L, null, null);
- Set<String> snapshotSstFiles1 = new HashSet<>(asList(
- "000059", "000053"));
- Set<String> snapshotSstFiles2 = new HashSet<>(asList(
- "000088", "000059", "000053", "000095"));
- Set<String> snapshotSstFiles3 = new HashSet<>(asList(
- "000088", "000105", "000059", "000053", "000095"));
- Set<String> snapshotSstFiles4 = new HashSet<>(asList(
- "000088", "000105", "000059", "000053", "000095", "000108"));
- Set<String> snapshotSstFiles1Alt1 = new HashSet<>(asList(
- "000059", "000053", "000066"));
- Set<String> snapshotSstFiles1Alt2 = new HashSet<>(asList(
- "000059", "000053", "000052"));
- Set<String> snapshotSstFiles2Alt2 = new HashSet<>(asList(
- "000088", "000059", "000053", "000095", "000099"));
- Set<String> snapshotSstFiles2Alt3 = new HashSet<>(asList(
- "000088", "000059", "000053", "000062"));
+ Set<String> snapshotSstFiles1 = ImmutableSet.of("000059", "000053");
+ Set<String> snapshotSstFiles2 = ImmutableSet.of("000088", "000059",
+ "000053", "000095");
+ Set<String> snapshotSstFiles3 = ImmutableSet.of("000088", "000105",
+ "000059", "000053", "000095");
+ Set<String> snapshotSstFiles4 = ImmutableSet.of("000088", "000105",
+ "000059", "000053", "000095", "000108");
+ Set<String> snapshotSstFiles1Alt1 = ImmutableSet.of("000059", "000053",
+ "000066");
+ Set<String> snapshotSstFiles1Alt2 = ImmutableSet.of("000059", "000053",
+ "000052");
+ Set<String> snapshotSstFiles2Alt2 = ImmutableSet.of("000088", "000059",
+ "000053", "000095", "000099");
+ Set<String> snapshotSstFiles2Alt3 = ImmutableSet.of("000088", "000059",
+ "000053", "000062");
return Stream.of(
- Arguments.of("Test 1: Regular case. Expands expandable " +
- "SSTs in the initial diff.",
+ Arguments.of("Test 1: Compaction log file regular case. " +
+ " Expands expandable SSTs in the initial diff.",
+ compactionLog,
+ null,
+ snapshotInfo3,
+ snapshotInfo1,
+ snapshotSstFiles3,
+ snapshotSstFiles1,
+ ImmutableSet.of("000059", "000053"),
+ ImmutableSet.of("000066", "000105", "000080", "000087", "000073",
+ "000095"),
+ false),
+ Arguments.of("Test 2: Compaction log file crafted input: " +
+ "One source ('to' snapshot) SST file is never compacted " +
+ "(newly flushed)",
+ compactionLog,
+ null,
+ snapshotInfo4,
+ snapshotInfo3,
+ snapshotSstFiles4,
+ snapshotSstFiles3,
+ ImmutableSet.of("000088", "000105", "000059", "000053", "000095"),
+ ImmutableSet.of("000108"),
+ false),
+ Arguments.of("Test 3: Compaction log file crafted input: " +
+ "Same SST files found during SST expansion",
+ compactionLog,
+ null,
+ snapshotInfo2,
+ snapshotInfo1,
+ snapshotSstFiles2,
+ snapshotSstFiles1Alt1,
+ ImmutableSet.of("000066", "000059", "000053"),
+ ImmutableSet.of("000080", "000087", "000073", "000095"),
+ false),
+ Arguments.of("Test 4: Compaction log file crafted input: " +
+ "Skipping known processed SST.",
+ compactionLog,
+ null,
+ snapshotInfo2,
+ snapshotInfo1,
+ snapshotSstFiles2Alt2,
+ snapshotSstFiles1Alt2,
+ Collections.emptySet(),
+ Collections.emptySet(),
+ true),
+ Arguments.of("Test 5: Compaction log file hit snapshot" +
+ " generation early exit condition",
+ compactionLog,
+ null,
+ snapshotInfo2,
+ snapshotInfo1,
+ snapshotSstFiles2Alt3,
+ snapshotSstFiles1,
+ ImmutableSet.of("000059", "000053"),
+ ImmutableSet.of("000066", "000080", "000087", "000073", "000062"),
+ false),
+ Arguments.of("Test 6: Compaction log table regular case. " +
+ "Expands expandable SSTs in the initial diff.",
+ null,
+ compactionLogEntries,
snapshotInfo3,
snapshotInfo1,
snapshotSstFiles3,
snapshotSstFiles1,
- new HashSet<>(asList("000059", "000053")),
- new HashSet<>(asList(
- "000066", "000105", "000080", "000087", "000073", "000095")),
+ ImmutableSet.of("000059", "000053"),
+ ImmutableSet.of("000066", "000105", "000080", "000087", "000073",
+ "000095"),
false),
- Arguments.of("Test 2: Crafted input: One source " +
- "('to' snapshot) SST file is never compacted (newly flushed)",
+ Arguments.of("Test 7: Compaction log table crafted input: " +
+ "One source ('to' snapshot) SST file is never compacted " +
+ "(newly flushed)",
+ null,
+ compactionLogEntries,
snapshotInfo4,
snapshotInfo3,
snapshotSstFiles4,
snapshotSstFiles3,
- new HashSet<>(asList(
- "000088", "000105", "000059", "000053", "000095")),
- new HashSet<>(asList("000108")),
+ ImmutableSet.of("000088", "000105", "000059", "000053", "000095"),
+ ImmutableSet.of("000108"),
false),
- Arguments.of("Test 3: Crafted input: Same SST files " +
- "found during SST expansion",
+ Arguments.of("Test 8: Compaction log table crafted input: " +
+ "Same SST files found during SST expansion",
+ null,
+ compactionLogEntries,
snapshotInfo2,
snapshotInfo1,
snapshotSstFiles2,
snapshotSstFiles1Alt1,
- new HashSet<>(asList("000066", "000059", "000053")),
- new HashSet<>(asList(
- "000080", "000087", "000073", "000095")),
+ ImmutableSet.of("000066", "000059", "000053"),
+ ImmutableSet.of("000080", "000087", "000073", "000095"),
false),
- Arguments.of("Test 4: Crafted input: Skipping known " +
- "processed SST.",
+ Arguments.of("Test 9: Compaction log table crafted input: " +
+ "Skipping known processed SST.",
+ null,
+ compactionLogEntries,
snapshotInfo2,
snapshotInfo1,
snapshotSstFiles2Alt2,
snapshotSstFiles1Alt2,
- new HashSet<>(),
- new HashSet<>(),
+ Collections.emptySet(),
+ Collections.emptySet(),
true),
- Arguments.of("Test 5: Hit snapshot generation early exit " +
- "condition",
+ Arguments.of("Test 10: Compaction log table hit snapshot " +
+ "generation early exit condition",
+ null,
+ compactionLogEntries,
snapshotInfo2,
snapshotInfo1,
snapshotSstFiles2Alt3,
snapshotSstFiles1,
- new HashSet<>(asList("000059", "000053")),
- new HashSet<>(asList(
- "000066", "000080", "000087", "000073", "000062")),
+ ImmutableSet.of("000059", "000053"),
+ ImmutableSet.of("000066", "000080", "000087", "000073", "000062"),
false)
);
}
@@ -266,6 +437,8 @@ public class TestRocksDBCheckpointDiffer {
@MethodSource("casesGetSSTDiffListWithoutDB")
@SuppressWarnings("parameternumber")
public void testGetSSTDiffListWithoutDB(String description,
+ String compactionLog,
+ List<CompactionLogEntry> compactionLogEntries,
DifferSnapshotInfo srcSnapshot,
DifferSnapshotInfo destSnapshot,
Set<String> srcSnapshotSstFiles,
@@ -274,55 +447,33 @@ public class TestRocksDBCheckpointDiffer {
Set<String> expectedDiffSstFiles,
boolean expectingException) {
- RocksDBCheckpointDiffer differ =
- new RocksDBCheckpointDiffer(metadataDirName,
- sstBackUpDirName,
- compactionLogDirName,
- activeDbDirName,
- config);
boolean exceptionThrown = false;
- long createdTime = System.currentTimeMillis();
- String compactionLog = ""
- // Snapshot 0
- + "S 1000 df6410c7-151b-4e90-870e-5ef12875acd5 " + createdTime + " \n"
- // Additional "compaction" to trigger and test early exit condition
- + "C 1291 000001,000002:000062\n"
- // Snapshot 1
- + "S 3008 ef6410c7-151b-4e90-870e-5ef12875acd5 " + createdTime + " \n"
- // Regular compaction
- + "C 4023 000068,000062:000069\n"
- // Trivial move
- + "C 5647 000071,000064,000060,000052:000071,000064,000060,000052\n"
- + "C 7658 000073,000066:000074\n"
- + "C 7872 000082,000076,000069:000083\n"
- + "C 9001 000087,000080,000074:000088\n"
- // Deletion?
- + "C 12755 000093,000090,000083:\n"
- // Snapshot 2
- + "S 14980 e7ad72f8-52df-4430-93f6-0ee91d4a47fd " + createdTime + "\n"
- + "C 16192 000098,000096,000085,000078,000071,000064,000060,000052"
- + ":000099\n"
- + "C 16762 000105,000095,000088:000107\n"
- // Snapshot 3
- + "S 17975 4f084f6e-ed3d-4780-8362-f832303309ea " + createdTime + "\n";
-
- // Construct DAG from compaction log input
- Arrays.stream(compactionLog.split("\n")).forEach(
- differ::processCompactionLogLine);
+ if (compactionLog != null) {
+ // Construct DAG from compaction log input
+ Arrays.stream(compactionLog.split("\n")).forEach(
+ rocksDBCheckpointDiffer::processCompactionLogLine);
+ } else if (compactionLogEntries != null) {
+ compactionLogEntries.forEach(entry ->
+ rocksDBCheckpointDiffer.addToCompactionLogTable(entry));
+ } else {
+ throw new IllegalArgumentException("One of compactionLog and " +
+ "compactionLogEntries should be non-null.");
+ }
+ rocksDBCheckpointDiffer.loadAllCompactionLogs();
Set<String> actualSameSstFiles = new HashSet<>();
Set<String> actualDiffSstFiles = new HashSet<>();
try {
- differ.internalGetSSTDiffList(
- srcSnapshot,
- destSnapshot,
- srcSnapshotSstFiles,
- destSnapshotSstFiles,
- differ.getForwardCompactionDAG(),
- actualSameSstFiles,
- actualDiffSstFiles);
+ rocksDBCheckpointDiffer.internalGetSSTDiffList(
+ srcSnapshot,
+ destSnapshot,
+ srcSnapshotSstFiles,
+ destSnapshotSstFiles,
+ rocksDBCheckpointDiffer.getForwardCompactionDAG(),
+ actualSameSstFiles,
+ actualDiffSstFiles);
} catch (RuntimeException rtEx) {
if (!expectingException) {
fail("Unexpected exception thrown in test.");
@@ -348,44 +499,37 @@ public class TestRocksDBCheckpointDiffer {
*/
@Test
void testDifferWithDB() throws Exception {
- RocksDBCheckpointDiffer differ =
- new RocksDBCheckpointDiffer(metadataDirName,
- sstBackUpDirName,
- compactionLogDirName,
- activeDbDirName,
- config);
- RocksDB rocksDB =
- createRocksDBInstanceAndWriteKeys(activeDbDirName, differ);
- readRocksDBInstance(activeDbDirName, rocksDB, null, differ);
+ writeKeysAndCheckpointing();
+ readRocksDBInstance(activeDbDirName, activeRocksDB, null,
+ rocksDBCheckpointDiffer);
if (LOG.isDebugEnabled()) {
printAllSnapshots();
}
- traverseGraph(differ.getCompactionNodeMap(),
- differ.getBackwardCompactionDAG(),
- differ.getForwardCompactionDAG());
+ traverseGraph(rocksDBCheckpointDiffer.getCompactionNodeMap(),
+ rocksDBCheckpointDiffer.getBackwardCompactionDAG(),
+ rocksDBCheckpointDiffer.getForwardCompactionDAG());
- diffAllSnapshots(differ);
+ diffAllSnapshots(rocksDBCheckpointDiffer);
// Confirm correct links created
try (Stream<Path> sstPathStream = Files.list(sstBackUpDir.toPath())) {
List<String> expectedLinks = sstPathStream.map(Path::getFileName)
.map(Object::toString).sorted().collect(Collectors.toList());
Assertions.assertEquals(expectedLinks, asList(
- "000015.sst", "000017.sst", "000019.sst", "000021.sst",
- "000022.sst", "000024.sst", "000026.sst"));
+ "000017.sst", "000019.sst", "000021.sst", "000023.sst",
+ "000024.sst", "000026.sst", "000029.sst"));
}
if (LOG.isDebugEnabled()) {
- differ.dumpCompactionNodeTable();
+ rocksDBCheckpointDiffer.dumpCompactionNodeTable();
}
- rocksDB.close();
- cleanUp();
+ cleanUpSnapshots();
}
- public void cleanup() {
+ public void cleanUpSnapshots() {
for (DifferSnapshotInfo snap : snapshots) {
snap.getRocksDB().close();
}
@@ -412,12 +556,12 @@ public class TestRocksDBCheckpointDiffer {
// Hard-coded expected output.
// The results are deterministic. Retrieved from a successful run.
final List<List<String>> expectedDifferResult = asList(
- asList("000024", "000017", "000028", "000026", "000019", "000021"),
- asList("000024", "000028", "000026", "000019", "000021"),
- asList("000024", "000028", "000026", "000021"),
- asList("000024", "000028", "000026"),
- asList("000028", "000026"),
- Collections.singletonList("000028"),
+ asList("000023", "000029", "000026", "000019", "000021", "000031"),
+ asList("000023", "000029", "000026", "000021", "000031"),
+ asList("000023", "000029", "000026", "000031"),
+ asList("000029", "000026", "000031"),
+ asList("000029", "000031"),
+ Collections.singletonList("000031"),
Collections.emptyList()
);
Assertions.assertEquals(snapshots.size(), expectedDifferResult.size());
@@ -437,8 +581,7 @@ public class TestRocksDBCheckpointDiffer {
/**
* Helper function that creates an RDB checkpoint (= Ozone snapshot).
*/
- private void createCheckpoint(RocksDBCheckpointDiffer differ,
- RocksDB rocksDB) throws RocksDBException {
+ private void createCheckpoint(RocksDB rocksDB) throws RocksDBException {
LOG.trace("Current time: " + System.currentTimeMillis());
long t1 = System.currentTimeMillis();
@@ -452,8 +595,6 @@ public class TestRocksDBCheckpointDiffer {
deleteDirectory(dir);
}
- final long dbLatestSequenceNumber = rocksDB.getLatestSequenceNumber();
-
createCheckPoint(activeDbDirName, cpPath, rocksDB);
final UUID snapshotId = UUID.randomUUID();
List<ColumnFamilyHandle> colHandle = new ArrayList<>();
@@ -464,21 +605,13 @@ public class TestRocksDBCheckpointDiffer {
colHandle));
this.snapshots.add(currentSnapshot);
- // Same as what OmSnapshotManager#createOmSnapshotCheckpoint would do
- differ.appendSnapshotInfoToCompactionLog(dbLatestSequenceNumber,
- snapshotId.toString(),
- System.currentTimeMillis());
-
- differ.setCurrentCompactionLog(dbLatestSequenceNumber);
-
long t2 = System.currentTimeMillis();
LOG.trace("Current time: " + t2);
LOG.debug("Time elapsed: " + (t2 - t1) + " ms");
}
// Flushes the WAL and Creates a RocksDB checkpoint
- void createCheckPoint(String dbPathArg, String cpPathArg,
- RocksDB rocksDB) {
+ void createCheckPoint(String dbPathArg, String cpPathArg, RocksDB rocksDB) {
LOG.debug("Creating RocksDB '{}' checkpoint at '{}'", dbPathArg,
cpPathArg);
try {
rocksDB.flush(new FlushOptions());
@@ -506,52 +639,24 @@ public class TestRocksDBCheckpointDiffer {
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOpts),
new ColumnFamilyDescriptor("keyTable".getBytes(UTF_8), cfOpts),
new ColumnFamilyDescriptor("directoryTable".getBytes(UTF_8), cfOpts),
- new ColumnFamilyDescriptor("fileTable".getBytes(UTF_8), cfOpts)
+ new ColumnFamilyDescriptor("fileTable".getBytes(UTF_8), cfOpts),
+ new ColumnFamilyDescriptor("compactionLogTable".getBytes(UTF_8),
cfOpts)
);
}
- // Test Code to create sample RocksDB instance.
- private RocksDB createRocksDBInstanceAndWriteKeys(String dbPathArg,
- RocksDBCheckpointDiffer differ) throws RocksDBException {
-
- LOG.debug("Creating RocksDB at '{}'", dbPathArg);
-
- // Delete the test DB dir if it exists
- File dir = new File(dbPathArg);
- if (dir.exists()) {
- deleteDirectory(dir);
- }
-
- final ColumnFamilyOptions cfOpts = new ColumnFamilyOptions()
- .optimizeUniversalStyleCompaction();
- final List<ColumnFamilyDescriptor> cfDescriptors =
- getCFDescriptorList(cfOpts);
- List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
-
- // Create a RocksDB instance with compaction tracking
- final DBOptions dbOptions = new DBOptions()
- .setCreateIfMissing(true)
- .setCreateMissingColumnFamilies(true);
- differ.setRocksDBForCompactionTracking(dbOptions);
- RocksDB rocksDB = RocksDB.open(dbOptions, dbPathArg, cfDescriptors,
- cfHandles);
-
- differ.setCurrentCompactionLog(rocksDB.getLatestSequenceNumber());
-
- // key-value
+ private void writeKeysAndCheckpointing() throws RocksDBException {
for (int i = 0; i < NUM_ROW; ++i) {
String generatedString = RandomStringUtils.randomAlphabetic(7);
String keyStr = "Key-" + i + "-" + generatedString;
String valueStr = "Val-" + i + "-" + generatedString;
byte[] key = keyStr.getBytes(UTF_8);
// Put entry in keyTable
- rocksDB.put(cfHandles.get(1), key, valueStr.getBytes(UTF_8));
+ activeRocksDB.put(keyTableCFHandle, key, valueStr.getBytes(UTF_8));
if (i % SNAPSHOT_EVERY_SO_MANY_KEYS == 0) {
- createCheckpoint(differ, rocksDB);
+ createCheckpoint(activeRocksDB);
}
}
- createCheckpoint(differ, rocksDB);
- return rocksDB;
+ createCheckpoint(activeRocksDB);
}
private boolean deleteDirectory(File directoryToBeDeleted) {
@@ -883,15 +988,8 @@ public class TestRocksDBCheckpointDiffer {
Set<CompactionNode> levelToBeRemoved,
MutableGraph<CompactionNode> expectedDag,
Set<String> expectedFileNodesRemoved) {
-
- RocksDBCheckpointDiffer differ =
- new RocksDBCheckpointDiffer(metadataDirName,
- sstBackUpDirName,
- compactionLogDirName,
- activeDbDirName,
- config);
Set<String> actualFileNodesRemoved =
- differ.pruneBackwardDag(originalDag, levelToBeRemoved);
+ rocksDBCheckpointDiffer.pruneBackwardDag(originalDag,
levelToBeRemoved);
Assertions.assertEquals(expectedDag, originalDag);
Assertions.assertEquals(actualFileNodesRemoved, expectedFileNodesRemoved);
}
@@ -945,56 +1043,46 @@ public class TestRocksDBCheckpointDiffer {
Set<CompactionNode> levelToBeRemoved,
MutableGraph<CompactionNode> expectedDag,
Set<String> expectedFileNodesRemoved) {
-
- RocksDBCheckpointDiffer differ =
- new RocksDBCheckpointDiffer(metadataDirName,
- sstBackUpDirName,
- compactionLogDirName,
- activeDbDirName,
- config);
Set<String> actualFileNodesRemoved =
- differ.pruneForwardDag(originalDag, levelToBeRemoved);
+ rocksDBCheckpointDiffer.pruneForwardDag(originalDag, levelToBeRemoved);
Assertions.assertEquals(expectedDag, originalDag);
Assertions.assertEquals(actualFileNodesRemoved, expectedFileNodesRemoved);
}
+ @SuppressWarnings("methodlength")
private static Stream<Arguments> compactionDagPruningScenarios() {
long currentTimeMillis = System.currentTimeMillis();
String compactionLogFile0 = "S 1000 snapshotId0 " +
(currentTimeMillis - MINUTES.toMillis(30)) + " \n";
- String compactionLogFile1 = "C 1000 000015,000013,000011,000009:000018," +
+ String compactionLogFile1 = "C 1500 000015,000013,000011,000009:000018," +
"000016,000017\n"
+ "S 2000 snapshotId1 " +
(currentTimeMillis - MINUTES.toMillis(24)) + " \n";
- String compactionLogFile2 = "C 1000 000018,000016,000017,000026,000024," +
+ String compactionLogFile2 = "C 2500 000018,000016,000017,000026,000024," +
"000022,000020:000027,000030,000028,000031,000029\n"
+ "S 3000 snapshotId2 " +
(currentTimeMillis - MINUTES.toMillis(18)) + " \n";
- String compactionLogFile3 = "C 1000 000027,000030,000028,000031,000029," +
+ String compactionLogFile3 = "C 3500 000027,000030,000028,000031,000029," +
"000039,000037,000035,000033:000040,000044,000042,000043,000046," +
"000041,000045\n"
- + "S 3000 snapshotId3 " +
+ + "S 4000 snapshotId3 " +
(currentTimeMillis - MINUTES.toMillis(12)) + " \n";
- String compactionLogFile4 = "C 1000 000040,000044,000042,000043,000046," +
+ String compactionLogFile4 = "C 4500 000040,000044,000042,000043,000046," +
"000041,000045,000054,000052,000050,000048:000059,000055,000056," +
"000060,000057,000058\n"
- + "S 3000 snapshotId4 " +
+ + "S 5000 snapshotId4 " +
(currentTimeMillis - MINUTES.toMillis(6)) + " \n";
- String compactionLogFileWithoutSnapshot1 = "C 1000 000015,000013,000011," +
+ String compactionLogFileWithoutSnapshot1 = "C 1500 000015,000013,000011," +
"000009:000018,000016,000017\n" +
"C 2000 000018,000016,000017,000026,000024,000022,000020" +
":000027,000030,000028,000031,000029\n";
- String compactionLogFileWithoutSnapshot2 = "C 3000 000027,000030,000028," +
- "000031,000029,000039,000037,000035,000033:000040,000044,000042," +
- "000043,000046,000041,000045\n";
-
- String compactionLogFileWithoutSnapshot3 = "C 4000 000040,000044,000042," +
+ String compactionLogFileWithoutSnapshot2 = "C 4500 000040,000044,000042," +
"000043,000046,000041,000045,000054,000052,000050,000048:000059," +
"000055,000056,000060,000057,000058\n";
@@ -1022,28 +1110,18 @@ public class TestRocksDBCheckpointDiffer {
"S 3000 snapshotIdWithoutCompaction6 " +
(currentTimeMillis - MINUTES.toMillis(3)) + " \n";
- Set<String> expectedNodes = new HashSet<>(
- Arrays.asList("000054", "000052", "000050", "000048", "000059",
- "000055", "000056", "000060", "000057", "000058")
- );
-
- Set<String> expectedAllNodes = new HashSet<>(
- Arrays.asList("000058", "000013", "000035", "000057", "000056",
- "000011", "000033", "000055", "000018", "000017", "000039",
- "000016", "000015", "000037", "000059", "000060", "000043",
- "000020", "000042", "000041", "000040", "000024", "000046",
- "000045", "000022", "000044", "000029", "000028", "000027",
- "000026", "000048", "000009", "000050", "000054", "000031",
- "000030", "000052")
- );
+ Set<String> expectedNodes = ImmutableSet.of("000059", "000055", "000056",
+ "000060", "000057", "000058");
return Stream.of(
Arguments.of("Each compaction log file has only one snapshot and one" +
" compaction statement except first log file.",
Arrays.asList(compactionLogFile0, compactionLogFile1,
compactionLogFile2, compactionLogFile3, compactionLogFile4),
+ null,
expectedNodes,
- 4
+ 4,
+ 0
),
Arguments.of("Compaction log doesn't have snapshot because OM" +
" restarted. Restart happened before snapshot to be deleted.",
@@ -1051,39 +1129,30 @@ public class TestRocksDBCheckpointDiffer {
compactionLogFileWithoutSnapshot1,
compactionLogFile3,
compactionLogFile4),
+ null,
expectedNodes,
- 3
+ 4,
+ 0
),
Arguments.of("Compaction log doesn't have snapshot because OM" +
" restarted. Restart happened after snapshot to be deleted.",
Arrays.asList(compactionLogFile0, compactionLogFile1,
compactionLogFile2, compactionLogFile3,
- compactionLogFileWithoutSnapshot3,
+ compactionLogFileWithoutSnapshot2,
compactionLogFileOnlyWithSnapshot4),
+ null,
expectedNodes,
- 4
+ 4,
+ 0
),
Arguments.of("No compaction happened in between two snapshots.",
Arrays.asList(compactionLogFile0, compactionLogFile1,
compactionLogFile2, compactionLogFile3,
compactionLogFileOnlyWithSnapshot1,
compactionLogFileOnlyWithSnapshot2, compactionLogFile4),
+ null,
expectedNodes,
- 6
- ),
- Arguments.of("No snapshot is taken and only one compaction log file,",
- Collections.singletonList(compactionLogFileWithoutSnapshot1 +
- compactionLogFileWithoutSnapshot2 +
- compactionLogFileWithoutSnapshot3),
- expectedAllNodes,
- 0
- ),
- Arguments.of("No snapshot is taken but multiple compaction files" +
- " because of OM restart.",
- Arrays.asList(compactionLogFileWithoutSnapshot1,
- compactionLogFileWithoutSnapshot2,
- compactionLogFileWithoutSnapshot3),
- expectedAllNodes,
+ 4,
0
),
Arguments.of("Only contains snapshots but no compaction.",
@@ -1093,14 +1162,76 @@ public class TestRocksDBCheckpointDiffer {
compactionLogFileOnlyWithSnapshot4,
compactionLogFileOnlyWithSnapshot5,
compactionLogFileOnlyWithSnapshot6),
+ null,
Collections.emptySet(),
- 3
+ 0,
+ 0
),
Arguments.of("No file exists because compaction has not happened" +
" and snapshot is not taken.",
Collections.emptyList(),
+ null,
Collections.emptySet(),
+ 0,
+ 0
+ ),
+ Arguments.of("When compaction table is used case 1.",
+ null,
+ asList(createCompactionEntry(1500,
+ (currentTimeMillis - MINUTES.toMillis(24)),
+ asList("000015", "000013", "000011", "000009"),
+ asList("000018", "000016", "000017")),
+ createCompactionEntry(2500,
+ (currentTimeMillis - MINUTES.toMillis(20)),
+ asList("000018", "000016", "000017", "000026", "000024",
+ "000022", "000020"),
+ asList("000027", "000030", "000028", "000031", "000029")),
+ createCompactionEntry(3500,
+ (currentTimeMillis - MINUTES.toMillis(16)),
+ asList("000027", "000030", "000028", "000031", "000029",
+ "000039", "000037", "000035", "000033"),
+ asList("000040", "000044", "000042", "000043", "000046",
+ "000041", "000045")),
+ createCompactionEntry(4500,
+ (currentTimeMillis - MINUTES.toMillis(12)),
+ asList("000040", "000044", "000042", "000043", "000046",
+ "000041", "000045", "000054", "000052", "000050",
+ "000048"),
+ asList("000059", "000055", "000056", "000060", "000057",
+ "000058"))),
+ expectedNodes,
+ 4,
0
+ ),
+ Arguments.of("When compaction table is used case 2.",
+ null,
+ asList(createCompactionEntry(1500,
+ (currentTimeMillis - MINUTES.toMillis(24)),
+ asList("000015", "000013", "000011", "000009"),
+ asList("000018", "000016", "000017")),
+ createCompactionEntry(2500,
+ (currentTimeMillis - MINUTES.toMillis(18)),
+ asList("000018", "000016", "000017", "000026", "000024",
+ "000022", "000020"),
+ asList("000027", "000030", "000028", "000031", "000029")),
+ createCompactionEntry(3500,
+ (currentTimeMillis - MINUTES.toMillis(12)),
+ asList("000027", "000030", "000028", "000031", "000029",
+ "000039", "000037", "000035", "000033"),
+ asList("000040", "000044", "000042", "000043", "000046",
+ "000041", "000045")),
+ createCompactionEntry(4500,
+ (currentTimeMillis - MINUTES.toMillis(6)),
+ asList("000040", "000044", "000042", "000043", "000046",
+ "000041", "000045", "000054", "000052", "000050",
+ "000048"),
+ asList("000059", "000055", "000056", "000060", "000057",
+ "000058"))),
+ ImmutableSet.of("000059", "000055", "000056", "000060", "000057",
+ "000058", "000040", "000044", "000042", "000043", "000046",
+ "000041", "000045", "000054", "000052", "000050", "000048"),
+ 4,
+ 1
)
);
}
@@ -1113,40 +1244,46 @@ public class TestRocksDBCheckpointDiffer {
public void testPruneOlderSnapshotsWithCompactionHistory(
String description,
List<String> compactionLogs,
+ List<CompactionLogEntry> compactionLogEntries,
Set<String> expectedNodes,
- int expectedNumberOfLogFilesDeleted
+ int expectedNumberOfLogEntriesBeforePruning,
+ int expectedNumberOfLogEntriesAfterPruning
) throws IOException, ExecutionException, InterruptedException,
TimeoutException {
List<File> filesCreated = new ArrayList<>();
- for (int i = 0; i < compactionLogs.size(); i++) {
- String compactionFileName = metadataDirName + "/" + compactionLogDirName
- + "/0000" + i + COMPACTION_LOG_FILE_NAME_SUFFIX;
- File compactionFile = new File(compactionFileName);
- Files.write(compactionFile.toPath(),
- compactionLogs.get(i).getBytes(UTF_8));
- filesCreated.add(compactionFile);
+ if (compactionLogs != null) {
+ for (int i = 0; i < compactionLogs.size(); i++) {
+ String compactionFileName = metadataDirName + "/" +
compactionLogDirName
+ + "/0000" + i + COMPACTION_LOG_FILE_NAME_SUFFIX;
+ File compactionFile = new File(compactionFileName);
+ Files.write(compactionFile.toPath(),
+ compactionLogs.get(i).getBytes(UTF_8));
+ filesCreated.add(compactionFile);
+ }
+ } else if (compactionLogEntries != null) {
+ compactionLogEntries.forEach(entry ->
+ rocksDBCheckpointDiffer.addToCompactionLogTable(entry));
+ } else {
+ throw new IllegalArgumentException("One of compactionLog or" +
+ " compactionLogEntries should be present.");
}
- RocksDBCheckpointDiffer differ =
- new RocksDBCheckpointDiffer(metadataDirName,
- sstBackUpDirName,
- compactionLogDirName,
- activeDbDirName,
- config);
-
- differ.loadAllCompactionLogs();
-
- waitForLock(differ,
+ rocksDBCheckpointDiffer.loadAllCompactionLogs();
+ assertEquals(expectedNumberOfLogEntriesBeforePruning,
+ countEntriesInCompactionLogTable());
+ waitForLock(rocksDBCheckpointDiffer,
RocksDBCheckpointDiffer::pruneOlderSnapshotsWithCompactionHistory);
- Set<String> actualNodesInForwardDAG = differ.getForwardCompactionDAG()
+ Set<String> actualNodesInForwardDAG = rocksDBCheckpointDiffer
+ .getForwardCompactionDAG()
.nodes()
.stream()
.map(CompactionNode::getFileName)
.collect(Collectors.toSet());
- Set<String> actualNodesBackwardDAG = differ.getBackwardCompactionDAG()
+ Set<String> actualNodesBackwardDAG = rocksDBCheckpointDiffer
+ .getBackwardCompactionDAG()
.nodes()
.stream()
.map(CompactionNode::getFileName)
@@ -1155,15 +1292,25 @@ public class TestRocksDBCheckpointDiffer {
assertEquals(expectedNodes, actualNodesInForwardDAG);
assertEquals(expectedNodes, actualNodesBackwardDAG);
- for (int i = 0; i < expectedNumberOfLogFilesDeleted; i++) {
+ for (int i = 0; compactionLogs != null && i < compactionLogs.size(); i++) {
File compactionFile = filesCreated.get(i);
assertFalse(compactionFile.exists());
}
- for (int i = expectedNumberOfLogFilesDeleted; i < compactionLogs.size();
- i++) {
- File compactionFile = filesCreated.get(i);
- assertTrue(compactionFile.exists());
+ assertEquals(expectedNumberOfLogEntriesAfterPruning,
+ countEntriesInCompactionLogTable());
+ }
+
+ private int countEntriesInCompactionLogTable() {
+ try (ManagedRocksIterator iterator = new ManagedRocksIterator(
+ activeRocksDB.newIterator(compactionLogTableCFHandle))) {
+ iterator.get().seekToFirst();
+ int count = 0;
+ while (iterator.get().isValid()) {
+ iterator.get().next();
+ count++;
+ }
+ return count;
}
}
@@ -1191,20 +1338,43 @@ public class TestRocksDBCheckpointDiffer {
}
private static Stream<Arguments> sstFilePruningScenarios() {
+ List<String> initialFiles1 = Arrays.asList("000015", "000013", "000011",
+ "000009");
+ List<String> initialFiles2 = Arrays.asList("000015", "000013", "000011",
+ "000009", "000018", "000016", "000017", "000026", "000024", "000022",
+ "000020");
+ List<String> initialFiles3 = Arrays.asList("000015", "000013", "000011",
+ "000009", "000018", "000016", "000017", "000026", "000024", "000022",
+ "000020", "000027", "000030", "000028", "000031", "000029", "000039",
+ "000037", "000035", "000033", "000040", "000044", "000042", "000043",
+ "000046", "000041", "000045", "000054", "000052", "000050", "000048",
+ "000059", "000055", "000056", "000060", "000057", "000058");
+
+ List<String> expectedFiles1 = Arrays.asList("000015", "000013", "000011",
+ "000009");
+ List<String> expectedFiles2 = Arrays.asList("000015", "000013", "000011",
+ "000009", "000026", "000024", "000022", "000020");
+ List<String> expectedFiles3 = Arrays.asList("000013", "000024", "000035",
+ "000011", "000022", "000033", "000039", "000015", "000026", "000037",
+ "000048", "000009", "000050", "000054", "000020", "000052");
+
return Stream.of(
- Arguments.of("Case 1: No compaction.",
+ Arguments.of("Case 1 with compaction log file: " +
+ "No compaction.",
"",
- Arrays.asList("000015", "000013", "000011", "000009"),
- Arrays.asList("000015", "000013", "000011", "000009")
+ null,
+ initialFiles1,
+ expectedFiles1
),
- Arguments.of("Case 2: One level compaction.",
+ Arguments.of("Case 2 with compaction log file: " +
+ "One level compaction.",
"C 1 000015,000013,000011,000009:000018,000016,000017\n",
- Arrays.asList("000015", "000013", "000011", "000009", "000018",
- "000016", "000017", "000026", "000024", "000022", "000020"),
- Arrays.asList("000015", "000013", "000011", "000009", "000026",
- "000024", "000022", "000020")
+ null,
+ initialFiles2,
+ expectedFiles2
),
- Arguments.of("Case 3: Multi-level compaction.",
+ Arguments.of("Case 3 with compaction log file: " +
+ "Multi-level compaction.",
"C 1 000015,000013,000011,000009:000018,000016,000017\n" +
"C 2 000018,000016,000017,000026,000024,000022,000020:000027,"
+
"000030,000028,000031,000029\n" +
@@ -1213,20 +1383,74 @@ public class TestRocksDBCheckpointDiffer {
"C 4 000040,000044,000042,000043,000046,000041,000045,000054,"
+
"000052,000050,000048:000059,000055,000056,000060,000057," +
"000058\n",
- Arrays.asList("000015", "000013", "000011", "000009", "000018",
- "000016", "000017", "000026", "000024", "000022", "000020",
- "000027", "000030", "000028", "000031", "000029", "000039",
- "000037", "000035", "000033", "000040", "000044", "000042",
- "000043", "000046", "000041", "000045", "000054", "000052",
- "000050", "000048", "000059", "000055", "000056", "000060",
- "000057", "000058"),
- Arrays.asList("000013", "000024", "000035", "000011", "000022",
- "000033", "000039", "000015", "000026", "000037", "000048",
- "000009", "000050", "000054", "000020", "000052")
+ null,
+ initialFiles3,
+ expectedFiles3
+ ),
+ Arguments.of("Case 4 with compaction log table: " +
+ "No compaction.",
+ null,
+ Collections.emptyList(),
+ initialFiles1,
+ expectedFiles1
+ ),
+ Arguments.of("Case 5 with compaction log table: " +
+ "One level compaction.",
+ null,
+ Collections.singletonList(createCompactionEntry(1,
+ now(),
+ asList("000015", "000013", "000011", "000009"),
+ asList("000018", "000016", "000017"))),
+ initialFiles2,
+ expectedFiles2
+ ),
+ Arguments.of("Case 6 with compaction log table: " +
+ "Multi-level compaction.",
+ null,
+ asList(createCompactionEntry(1,
+ now(),
+ asList("000015", "000013", "000011", "000009"),
+ asList("000018", "000016", "000017")),
+ createCompactionEntry(2,
+ now(),
+ asList("000018", "000016", "000017", "000026", "000024",
+ "000022", "000020"),
+ asList("000027", "000030", "000028", "000031", "000029")),
+ createCompactionEntry(3,
+ now(),
+ asList("000027", "000030", "000028", "000031", "000029",
+ "000039", "000037", "000035", "000033"),
+ asList("000040", "000044", "000042", "000043", "000046",
+ "000041", "000045")),
+ createCompactionEntry(4,
+ now(),
+ asList("000040", "000044", "000042", "000043", "000046",
+ "000041", "000045", "000054", "000052", "000050",
+ "000048"),
+ asList("000059", "000055", "000056", "000060", "000057",
+ "000058"))),
+ initialFiles3,
+ expectedFiles3
)
);
}
+ private static CompactionLogEntry createCompactionEntry(
+ long dbSequenceNumber,
+ long compactionTime,
+ List<String> inputFiles,
+ List<String> outputFiles
+ ) {
+ return new CompactionLogEntry.Builder(dbSequenceNumber, compactionTime,
+ toFileInfoList(inputFiles), toFileInfoList(outputFiles)).build();
+ }
+
+ private static List<CompactionFileInfo> toFileInfoList(List<String> files) {
+ return files.stream()
+ .map(fileName -> new CompactionFileInfo.Builder(fileName).build())
+ .collect(Collectors.toList());
+ }
+
/**
* End-to-end test for SST file pruning.
*/
@@ -1235,30 +1459,37 @@ public class TestRocksDBCheckpointDiffer {
public void testSstFilePruning(
String description,
String compactionLog,
+ List<CompactionLogEntry> compactionLogEntries,
List<String> initialFiles,
List<String> expectedFiles
) throws IOException, ExecutionException, InterruptedException,
TimeoutException {
- createFileWithContext(metadataDirName + "/" + compactionLogDirName
- + "/compaction_log" + COMPACTION_LOG_FILE_NAME_SUFFIX,
- compactionLog);
for (String fileName : initialFiles) {
createFileWithContext(sstBackUpDir + "/" + fileName + SST_FILE_EXTENSION,
fileName);
}
- RocksDBCheckpointDiffer differ =
- new RocksDBCheckpointDiffer(metadataDirName,
- sstBackUpDirName,
- compactionLogDirName,
- activeDbDirName,
- config);
-
- differ.loadAllCompactionLogs();
+ Path compactionLogFilePath = null;
+ if (compactionLog != null) {
+ String compactionLogFileName = metadataDirName + "/" +
+ compactionLogDirName + "/compaction_log" +
+ COMPACTION_LOG_FILE_NAME_SUFFIX;
+ compactionLogFilePath = new File(compactionLogFileName).toPath();
+ createFileWithContext(compactionLogFileName, compactionLog);
+ assertTrue(Files.exists(compactionLogFilePath));
+ } else if (compactionLogEntries != null) {
+ compactionLogEntries.forEach(entry ->
+ rocksDBCheckpointDiffer.addToCompactionLogTable(entry));
+ } else {
+ throw new IllegalArgumentException("One of compactionLog or" +
+ " compactionLogEntries should be present.");
+ }
- waitForLock(differ, RocksDBCheckpointDiffer::pruneSstFiles);
+ rocksDBCheckpointDiffer.loadAllCompactionLogs();
+ waitForLock(rocksDBCheckpointDiffer,
+ RocksDBCheckpointDiffer::pruneSstFiles);
Set<String> actualFileSetAfterPruning;
try (Stream<Path> pathStream = Files.list(
@@ -1275,6 +1506,10 @@ public class TestRocksDBCheckpointDiffer {
Set<String> expectedFileSet = new HashSet<>(expectedFiles);
assertEquals(expectedFileSet, actualFileSetAfterPruning);
+
+ if (compactionLogFilePath != null) {
+ assertFalse(Files.exists(compactionLogFilePath));
+ }
}
private void createFileWithContext(String fileName, String context)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotBackgroundServices.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotBackgroundServices.java
index 10df45b268..8d61fb1a3f 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotBackgroundServices.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotBackgroundServices.java
@@ -44,6 +44,8 @@ import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
import org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone;
import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse;
+import org.apache.ozone.compaction.log.CompactionLogEntry;
+import org.apache.ozone.rocksdiff.CompactionNode;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.LambdaTestUtils;
import org.apache.ratis.server.protocol.TermIndex;
@@ -55,12 +57,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
-import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -71,6 +69,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import static java.util.stream.Collectors.toSet;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL;
@@ -404,36 +403,48 @@ public class TestSnapshotBackgroundServices {
cluster.getOzoneManager(leaderOM.getOMNodeId());
Assertions.assertEquals(leaderOM, newFollowerOM);
- // Prepare baseline data for compaction logs
- String currentCompactionLogPath = newLeaderOM
- .getMetadataManager()
- .getStore()
- .getRocksDBCheckpointDiffer()
- .getCurrentCompactionLogPath();
- Assertions.assertNotNull(currentCompactionLogPath);
- int lastIndex = currentCompactionLogPath.lastIndexOf(OM_KEY_PREFIX);
- String compactionLogsPath = currentCompactionLogPath
- .substring(0, lastIndex);
- File compactionLogsDir = new File(compactionLogsPath);
- Assertions.assertNotNull(compactionLogsDir);
- File[] files = compactionLogsDir.listFiles();
- Assertions.assertNotNull(files);
- int numberOfLogFiles = files.length;
- long contentLength;
- Path currentCompactionLog = Paths.get(currentCompactionLogPath);
- try (BufferedReader bufferedReader =
- Files.newBufferedReader(currentCompactionLog)) {
- contentLength = bufferedReader.lines()
- .mapToLong(String::length)
- .reduce(0L, Long::sum);
- }
+ List<CompactionLogEntry> compactionLogEntriesOnPreviousLeader =
+ getCompactionLogEntries(leaderOM);
+
+ List<CompactionLogEntry> compactionLogEntriesOnNewLeader =
+ getCompactionLogEntries(newLeaderOM);
+ Assertions.assertEquals(compactionLogEntriesOnPreviousLeader,
+ compactionLogEntriesOnNewLeader);
+
+ Assertions.assertEquals(leaderOM.getMetadataManager().getStore()
+ .getRocksDBCheckpointDiffer().getForwardCompactionDAG().nodes()
+ .stream().map(CompactionNode::getFileName).collect(toSet()),
+ newLeaderOM.getMetadataManager().getStore()
+ .getRocksDBCheckpointDiffer().getForwardCompactionDAG().nodes()
+ .stream().map(CompactionNode::getFileName).collect(toSet()));
+ Assertions.assertEquals(leaderOM.getMetadataManager().getStore()
+ .getRocksDBCheckpointDiffer().getForwardCompactionDAG().edges()
+ .stream().map(edge ->
+ edge.source().getFileName() + "-" +
edge.target().getFileName())
+ .collect(toSet()),
+ newLeaderOM.getMetadataManager().getStore()
+ .getRocksDBCheckpointDiffer().getForwardCompactionDAG().edges()
+ .stream().map(edge ->
+ edge.source().getFileName() + "-" +
edge.target().getFileName())
+ .collect(toSet()));
- checkIfCompactionLogsGetAppendedByForcingCompaction(newLeaderOM,
- compactionLogsDir, numberOfLogFiles, contentLength,
- currentCompactionLog);
+ confirmSnapDiffForTwoSnapshotsDifferingBySingleKey(newLeaderOM);
+ }
- confirmSnapDiffForTwoSnapshotsDifferingBySingleKey(
- newLeaderOM);
+ private List<CompactionLogEntry> getCompactionLogEntries(OzoneManager om)
+ throws IOException {
+ List<CompactionLogEntry> compactionLogEntries = new ArrayList<>();
+ try (TableIterator<String,
+ ? extends Table.KeyValue<String, CompactionLogEntry>>
+ iterator = om.getMetadataManager().getCompactionLogTable()
+ .iterator()) {
+ iterator.seekToFirst();
+
+ while (iterator.hasNext()) {
+ compactionLogEntries.add(iterator.next().getValue());
+ }
+ }
+ return compactionLogEntries;
}
@Test
@@ -546,28 +557,6 @@ public class TestSnapshotBackgroundServices {
}, 1000, 10000);
}
- private static void checkIfCompactionLogsGetAppendedByForcingCompaction(
- OzoneManager ozoneManager,
- File compactionLogsDir, int numberOfLogFiles,
- long contentLength, Path currentCompactionLog)
- throws IOException {
- ozoneManager.getMetadataManager()
- .getStore()
- .compactDB();
- File[] files = compactionLogsDir.listFiles();
- Assertions.assertNotNull(files);
- int newNumberOfLogFiles = files.length;
- long newContentLength;
- try (BufferedReader bufferedReader =
- Files.newBufferedReader(currentCompactionLog)) {
- newContentLength = bufferedReader.lines()
- .mapToLong(String::length)
- .reduce(0L, Long::sum);
- }
- Assertions.assertTrue(numberOfLogFiles < newNumberOfLogFiles
- || contentLength < newContentLength);
- }
-
private static File getSstBackupDir(OzoneManager ozoneManager) {
String sstBackupDirPath = ozoneManager
.getMetadataManager()
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index 1bcaefccfc..4ec14e1596 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -462,28 +462,6 @@ public final class OmSnapshotManager implements
AutoCloseable {
dbCheckpoint.getCheckpointLocation(), snapshotInfo.getName());
}
- final RocksDBCheckpointDiffer dbCpDiffer =
- store.getRocksDBCheckpointDiffer();
-
- if (dbCpDiffer != null) {
- final long dbLatestSequenceNumber = snapshotInfo.getDbTxSequenceNumber();
-
- Objects.requireNonNull(snapshotInfo.getSnapshotId(),
- "SnapshotId is null for snapshot: " + snapshotInfo.getName());
- // Write snapshot generation (latest sequence number) to compaction log.
- // This will be used for DAG reconstruction as snapshotGeneration.
- dbCpDiffer.appendSnapshotInfoToCompactionLog(dbLatestSequenceNumber,
- snapshotInfo.getSnapshotId().toString(),
- snapshotInfo.getCreationTime());
-
- // Set compaction log filename to the latest DB sequence number
- // right after taking the RocksDB checkpoint for Ozone snapshot.
- //
- // Note it doesn't matter if sequence number hasn't increased (even
though
- // it shouldn't happen), since the writer always appends the file.
- dbCpDiffer.setCurrentCompactionLog(dbLatestSequenceNumber);
- }
-
return dbCheckpoint;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]