This is an automated email from the ASF dual-hosted git repository.
siyao pushed a commit to branch HDDS-6517-Snapshot
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-6517-Snapshot by this
push:
new 2554ec26d7 HDDS-7508. [Snapshot] Finish TestOMSnapshotDAG, restore
CompactionNode SnapshotID field for debugging (#3981)
2554ec26d7 is described below
commit 2554ec26d772233d6a2b98810583d1876606967b
Author: Siyao Meng <[email protected]>
AuthorDate: Tue Dec 6 16:00:45 2022 -0800
HDDS-7508. [Snapshot] Finish TestOMSnapshotDAG, restore CompactionNode
SnapshotID field for debugging (#3981)
---
.../org/apache/hadoop/ozone/OzoneConfigKeys.java | 5 +
.../common/src/main/resources/ozone-default.xml | 9 +
.../hadoop/hdds/utils/db/DBStoreBuilder.java | 26 +-
hadoop-hdds/rocksdb-checkpoint-differ/pom.xml | 4 +
.../org/apache/ozone/rocksdiff/CompactionNode.java | 80 ++++
.../apache/ozone/rocksdiff/DifferSnapshotInfo.java | 63 +++
.../ozone/rocksdiff/RocksDBCheckpointDiffer.java | 388 +++++++----------
.../rocksdiff/TestRocksDBCheckpointDiffer.java | 482 ++++++++++++++-------
.../hadoop/ozone/freon/TestOMSnapshotDAG.java | 85 ++--
.../apache/hadoop/ozone/om/OmSnapshotManager.java | 3 +-
10 files changed, 715 insertions(+), 430 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 4293f3ef1e..74ed487e8c 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -99,6 +99,11 @@ public final class OzoneConfigKeys {
public static final String OZONE_METADATA_STORE_ROCKSDB_STATISTICS_OFF =
"OFF";
+ public static final String OZONE_METADATA_STORE_ROCKSDB_CF_WRITE_BUFFER_SIZE
=
+ "ozone.metastore.rocksdb.cf.write.buffer.size";
+ public static final String
+ OZONE_METADATA_STORE_ROCKSDB_CF_WRITE_BUFFER_SIZE_DEFAULT = "128MB";
+
public static final String OZONE_UNSAFEBYTEOPERATIONS_ENABLED =
"ozone.UnsafeByteOperations.enabled";
public static final boolean OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 788bc0e8a5..3e8499bf71 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -669,6 +669,15 @@
Check the rocksdb documentation for more details.
</description>
</property>
+ <property>
+ <name>ozone.metastore.rocksdb.cf.write.buffer.size</name>
+ <value>128MB</value>
+ <tag>OZONE, OM, SCM, STORAGE, PERFORMANCE</tag>
+ <description>
+ The write buffer (memtable) size for each column family of the rocksdb
+ store. Check the rocksdb documentation for more details.
+ </description>
+ </property>
<property>
<name>ozone.scm.db.dirs</name>
<value/>
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
index 9f94e1d2c5..5aad1aa526 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
@@ -39,11 +39,14 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import com.google.common.base.Preconditions;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE;
import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_CF_WRITE_BUFFER_SIZE;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_CF_WRITE_BUFFER_SIZE_DEFAULT;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS_DEFAULT;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS_OFF;
import static org.rocksdb.RocksDB.DEFAULT_COLUMN_FAMILY;
+import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
@@ -87,6 +90,8 @@ public final class DBStoreBuilder {
private ConfigurationSource configuration;
private CodecRegistry registry;
private String rocksDbStat;
+ // RocksDB column family write buffer size
+ private long rocksDbCfWriteBufferSize;
private RocksDBConfiguration rocksDBConfiguration;
// Flag to indicate if the RocksDB should be opened readonly.
private boolean openReadOnly = false;
@@ -129,6 +134,10 @@ public final class DBStoreBuilder {
this.rocksDbStat = configuration.getTrimmed(
OZONE_METADATA_STORE_ROCKSDB_STATISTICS,
OZONE_METADATA_STORE_ROCKSDB_STATISTICS_DEFAULT);
+ this.rocksDbCfWriteBufferSize = (long) configuration.getStorageSize(
+ OZONE_METADATA_STORE_ROCKSDB_CF_WRITE_BUFFER_SIZE,
+ OZONE_METADATA_STORE_ROCKSDB_CF_WRITE_BUFFER_SIZE_DEFAULT,
+ StorageUnit.BYTES);
this.rocksDBConfiguration = rocksDBConfiguration;
// Get default DBOptions and ColumnFamilyOptions from the default DB
@@ -275,7 +284,7 @@ public final class DBStoreBuilder {
// If default column family was not added, add it with the default options.
cfOptions.putIfAbsent(DEFAULT_COLUMN_FAMILY_NAME,
- getDefaultCfOptions());
+ getCfOptions(rocksDbCfWriteBufferSize));
for (Map.Entry<String, ManagedColumnFamilyOptions> entry:
cfOptions.entrySet()) {
@@ -284,7 +293,8 @@ public final class DBStoreBuilder {
if (options == null) {
LOG.debug("using default column family options for table: {}", name);
- tableConfigs.add(new TableConfig(name, getDefaultCfOptions()));
+ tableConfigs.add(new TableConfig(name,
+ getCfOptions(rocksDbCfWriteBufferSize)));
} else {
tableConfigs.add(new TableConfig(name, options));
}
@@ -298,6 +308,18 @@ public final class DBStoreBuilder {
.orElseGet(defaultCfProfile::getColumnFamilyOptions);
}
+ /**
+ * Get default column family options, but with column family write buffer
+ * size limit overridden.
+ * @param writeBufferSize Specify column family write buffer size.
+ * @return ManagedColumnFamilyOptions
+ */
+ private ManagedColumnFamilyOptions getCfOptions(long writeBufferSize) {
+ ManagedColumnFamilyOptions cfOpts = getDefaultCfOptions();
+ cfOpts.setWriteBufferSize(writeBufferSize);
+ return cfOpts;
+ }
+
/**
* Attempts to get RocksDB {@link ManagedDBOptions} from an ini config
* file. If that file does not exist, the value of {@code defaultDBOptions}
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
index c6af10fbc4..b288364f73 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
@@ -85,6 +85,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>org.apache.ozone</groupId>
<artifactId>hdds-test-utils</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/CompactionNode.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/CompactionNode.java
new file mode 100644
index 0000000000..a7cfa27aaf
--- /dev/null
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/CompactionNode.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ozone.rocksdiff;
+
+/**
+ * Node in the compaction DAG that represents an SST file.
+ */
+public class CompactionNode {
+ // Name of the SST file
+ private final String fileName;
+ // The last snapshot created before this node came into existence
+ private final String snapshotId;
+ private final long snapshotGeneration;
+ private final long totalNumberOfKeys;
+ private long cumulativeKeysReverseTraversal;
+
+ /**
+ * CompactionNode constructor.
+ * @param file SST file (filename without extension)
+ * @param ssId snapshotId field. Added here for improved debuggability only
+ * @param numKeys Number of keys in the SST
+ * @param seqNum Snapshot generation (sequence number)
+ */
+ public CompactionNode(String file, String ssId, long numKeys, long seqNum) {
+ fileName = file;
+ snapshotId = ssId;
+ totalNumberOfKeys = numKeys;
+ snapshotGeneration = seqNum;
+ cumulativeKeysReverseTraversal = 0L;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Node{%s}", fileName);
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public String getSnapshotId() {
+ return snapshotId;
+ }
+
+ public long getSnapshotGeneration() {
+ return snapshotGeneration;
+ }
+
+ public long getTotalNumberOfKeys() {
+ return totalNumberOfKeys;
+ }
+
+ public long getCumulativeKeysReverseTraversal() {
+ return cumulativeKeysReverseTraversal;
+ }
+
+ public void setCumulativeKeysReverseTraversal(
+ long cumulativeKeysReverseTraversal) {
+ this.cumulativeKeysReverseTraversal = cumulativeKeysReverseTraversal;
+ }
+
+ public void addCumulativeKeysReverseTraversal(long diff) {
+ this.cumulativeKeysReverseTraversal += diff;
+ }
+}
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/DifferSnapshotInfo.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/DifferSnapshotInfo.java
new file mode 100644
index 0000000000..fb82bd280f
--- /dev/null
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/DifferSnapshotInfo.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ozone.rocksdiff;
+
+import java.util.Map;
+
+/**
+ * Snapshot information node class for the differ.
+ */
+public class DifferSnapshotInfo {
+ private final String dbPath;
+ private final String snapshotID;
+ private final long snapshotGeneration;
+
+ private final Map<String, String> tablePrefixes;
+
+ public DifferSnapshotInfo(String db, String id, long gen,
+ Map<String, String> prefixes) {
+ dbPath = db;
+ snapshotID = id;
+ snapshotGeneration = gen;
+ tablePrefixes = prefixes;
+ }
+
+ public String getDbPath() {
+ return dbPath;
+ }
+
+ public String getSnapshotID() {
+ return snapshotID;
+ }
+
+ public long getSnapshotGeneration() {
+ return snapshotGeneration;
+ }
+
+ public Map<String, String> getTablePrefixes() {
+ return tablePrefixes;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("DifferSnapshotInfo{dbPath='%s', snapshotID='%s', " +
+ "snapshotGeneration=%d, tablePrefixes size=%s}",
+ dbPath, snapshotID, snapshotGeneration, tablePrefixes.size());
+ }
+
+}
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index 4ad8a0f9b2..7a8ade9f5f 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -18,6 +18,7 @@
package org.apache.ozone.rocksdiff;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.graph.GraphBuilder;
import com.google.common.graph.MutableGraph;
import org.apache.commons.lang3.StringUtils;
@@ -42,6 +43,7 @@ import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
+import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -49,10 +51,9 @@ import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
+import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -132,12 +133,20 @@ public class RocksDBCheckpointDiffer {
private static final int LONG_MAX_STRLEN =
String.valueOf(Long.MAX_VALUE).length();
+ /**
+ * Used during DAG reconstruction.
+ */
private long reconstructionSnapshotGeneration;
+ private String reconstructionLastSnapshotID;
/**
* Dummy object that acts as a write lock in compaction listener.
*/
private final Object compactionListenerWriteLock = new Object();
+ /**
+ * Flag for testing. Skips SST file summary reader.
+ */
+ private boolean skipGetSSTFileSummary = false;
/**
* Constructor.
@@ -167,6 +176,16 @@ public class RocksDBCheckpointDiffer {
this.activeDBLocationStr = activeDBLocation.toString() + "/";
}
+ /**
+ * This constructor is only meant for unit testing.
+ */
+ @VisibleForTesting
+ RocksDBCheckpointDiffer() {
+ this.skipGetSSTFileSummary = true;
+ this.sstBackupDir = null;
+ this.activeDBLocationStr = null;
+ }
+
private void setCompactionLogDir(String metadataDir,
String compactionLogDirName) {
@@ -232,31 +251,6 @@ public class RocksDBCheckpointDiffer {
appendToCurrentCompactionLog("");
}
- // Node in the DAG to represent an SST file
- private static class CompactionNode {
- // Name of the SST file
- private final String fileName;
- // The last snapshot created before this node came into existence
- private final String snapshotId;
- private final long snapshotGeneration;
- private final long totalNumberOfKeys;
- private long cumulativeKeysReverseTraversal;
-
- CompactionNode(String file, String ssId, long numKeys, long seqNum) {
- fileName = file;
- // Retained for debuggability. Unused for now.
- snapshotId = ssId;
- totalNumberOfKeys = numKeys;
- snapshotGeneration = seqNum;
- cumulativeKeysReverseTraversal = 0L;
- }
-
- @Override
- public String toString() {
- return String.format("Node{%s}", fileName);
- }
- }
-
// Hash table to track CompactionNode for a given SST File.
private final ConcurrentHashMap<String, CompactionNode> compactionNodeMap =
new ConcurrentHashMap<>();
@@ -317,8 +311,10 @@ public class RocksDBCheckpointDiffer {
* snapshot (RDB checkpoint) is taken.
* @param sequenceNum RDB sequence number
*/
- public void appendSequenceNumberToCompactionLog(long sequenceNum) {
- final String line = COMPACTION_LOG_SEQNUM_LINE_PREFIX + sequenceNum + "\n";
+ public void appendSequenceNumberToCompactionLog(long sequenceNum,
+ String snapshotID) {
+ final String line = COMPACTION_LOG_SEQNUM_LINE_PREFIX + sequenceNum +
+ " " + snapshotID + "\n";
appendToCurrentCompactionLog(line);
}
@@ -357,6 +353,12 @@ public class RocksDBCheckpointDiffer {
public void onCompactionBegin(RocksDB db,
CompactionJobInfo compactionJobInfo) {
+ // TODO: Skip (return) if no snapshot has been taken yet
+
+ // Note the current compaction listener implementation does not
+ // differentiate which column family each SST store. It is tracking
+ // all SST files.
+
synchronized (compactionListenerWriteLock) {
if (compactionJobInfo.inputFiles().size() == 0) {
@@ -374,6 +376,12 @@ public class RocksDBCheckpointDiffer {
Path srcFile = Paths.get(file);
try {
Files.createLink(link, srcFile);
+ } catch (FileAlreadyExistsException ignored) {
+ // This could happen if a previous compaction is a "trivial
move",
+ // where output SSTs files are exactly the same as input files.
+ // Those SSTs are simply moved to the next level without rewrites
+ // or renames.
+ LOG.debug("SST file already exists: {}", file);
} catch (IOException e) {
LOG.error("Exception in creating hard link for {}", file);
throw new RuntimeException("Failed to create hard link", e);
@@ -391,6 +399,8 @@ public class RocksDBCheckpointDiffer {
public void onCompactionCompleted(RocksDB db,
CompactionJobInfo compactionJobInfo) {
+ // TODO: Skip (return) if no snapshot has been taken yet
+
synchronized (compactionListenerWriteLock) {
if (compactionJobInfo.inputFiles().isEmpty()) {
@@ -440,7 +450,9 @@ public class RocksDBCheckpointDiffer {
appendToCurrentCompactionLog(sb.toString());
// Populate the DAG
- populateCompactionDAG(inputFiles, outputFiles,
+ // TODO: Once SnapshotChainManager is put into use, set snapshotID to
+ // snapshotChainManager.getLatestGlobalSnapshot()
+ populateCompactionDAG(inputFiles, outputFiles, null,
db.getLatestSequenceNumber());
/*
if (debugEnabled(DEBUG_DAG_BUILD_UP)) {
@@ -458,6 +470,16 @@ public class RocksDBCheckpointDiffer {
* @return number of keys
*/
private long getSSTFileSummary(String filename) throws RocksDBException {
+
+ if (skipGetSSTFileSummary) {
+ // For testing only
+ return 1L;
+ }
+
+ if (!filename.endsWith(SST_FILE_EXTENSION)) {
+ filename += SST_FILE_EXTENSION;
+ }
+
Options option = new Options();
SstFileReader reader = new SstFileReader(option);
@@ -542,6 +564,8 @@ public class RocksDBCheckpointDiffer {
rocksDB = RocksDB.openReadOnly(dbOptions, dbPathArg,
cfDescriptors, columnFamilyHandles);
+ // Note it retrieves only the selected column families by the descriptor
+ // i.e. keyTable, directoryTable, fileTable
List<LiveFileMetaData> liveFileMetaDataList =
rocksDB.getLiveFilesMetaData();
LOG.debug("SST File Metadata for DB: " + dbPathArg);
@@ -565,7 +589,7 @@ public class RocksDBCheckpointDiffer {
/**
* Process each line of compaction log text file input and populate the DAG.
*/
- private synchronized void processCompactionLogLine(String line) {
+ synchronized void processCompactionLogLine(String line) {
LOG.debug("Processing line: {}", line);
@@ -573,26 +597,34 @@ public class RocksDBCheckpointDiffer {
// Skip comments
LOG.debug("Comment line, skipped");
} else if (line.startsWith(COMPACTION_LOG_SEQNUM_LINE_PREFIX)) {
- // Read sequence number
- LOG.debug("Reading sequence number as snapshot generation");
- final String seqNumStr =
+ // Read sequence number, and snapshot ID
+ LOG.debug("Reading sequence number as snapshot generation, "
+ + "and snapshot ID");
+ final String trimmedStr =
line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length()).trim();
+ final Scanner input = new Scanner(trimmedStr);
// This would the snapshot generation for the nodes to come
- reconstructionSnapshotGeneration = Long.parseLong(seqNumStr);
+ reconstructionSnapshotGeneration = input.nextLong();
+ // This is the snapshotID assigned to every single CompactionNode to come
+ reconstructionLastSnapshotID = input.nextLine().trim();
} else if (line.startsWith(COMPACTION_LOG_ENTRY_LINE_PREFIX)) {
// Read compaction log entry
// Trim the beginning
- line = line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length());
+ line = line.substring(COMPACTION_LOG_ENTRY_LINE_PREFIX.length());
final String[] io = line.split(":");
if (io.length != 2) {
- LOG.error("Invalid line in compaction log: {}", line);
+ if (line.endsWith(":")) {
+ LOG.debug("Ignoring compaction log line for SST deletion");
+ } else {
+ LOG.error("Invalid line in compaction log: {}", line);
+ }
return;
}
final String[] inputFiles = io[0].split(",");
final String[] outputFiles = io[1].split(",");
- populateCompactionDAG(asList(inputFiles),
- asList(outputFiles), reconstructionSnapshotGeneration);
+ populateCompactionDAG(asList(inputFiles), asList(outputFiles),
+ reconstructionLastSnapshotID, reconstructionSnapshotGeneration);
} else {
LOG.error("Invalid line in compaction log: {}", line);
}
@@ -635,47 +667,6 @@ public class RocksDBCheckpointDiffer {
}
}
- /**
- * Snapshot information node class for the differ.
- */
- public static class DifferSnapshotInfo {
- private final String dbPath;
- private final String snapshotID;
- private final long snapshotGeneration;
- private final Map<String, String> tablePrefixes;
-
- public DifferSnapshotInfo(String db, String id, long gen,
- Map<String, String> prefixes) {
- dbPath = db;
- snapshotID = id;
- snapshotGeneration = gen;
- tablePrefixes = prefixes;
- }
-
- public String getDbPath() {
- return dbPath;
- }
-
- public String getSnapshotID() {
- return snapshotID;
- }
-
- public long getSnapshotGeneration() {
- return snapshotGeneration;
- }
-
- public Map<String, String> getTablePrefixes() {
- return tablePrefixes;
- }
-
- @Override
- public String toString() {
- return "DifferSnapshotInfo{" + "dbPath='" + dbPath + '\''
- + ", snapshotID='" + snapshotID + '\'' + ", snapshotGeneration="
- + snapshotGeneration + '}';
- }
- }
-
/**
* Get a list of SST files that differs between src and destination
snapshots.
* <p>
@@ -687,20 +678,23 @@ public class RocksDBCheckpointDiffer {
public synchronized List<String> getSSTDiffList(
DifferSnapshotInfo src, DifferSnapshotInfo dest) {
- HashSet<String> srcSnapFiles = readRocksDBLiveFiles(src.dbPath);
- HashSet<String> destSnapFiles = readRocksDBLiveFiles(dest.dbPath);
+ // TODO: Reject or swap if dest is taken after src, once snapshot chain
+ // integration is done.
+
+ HashSet<String> srcSnapFiles = readRocksDBLiveFiles(src.getDbPath());
+ HashSet<String> destSnapFiles = readRocksDBLiveFiles(dest.getDbPath());
HashSet<String> fwdDAGSameFiles = new HashSet<>();
HashSet<String> fwdDAGDifferentFiles = new HashSet<>();
LOG.debug("Doing forward diff from src '{}' to dest '{}'",
- src.dbPath, dest.dbPath);
+ src.getDbPath(), dest.getDbPath());
internalGetSSTDiffList(src, dest, srcSnapFiles, destSnapFiles,
forwardCompactionDAG, fwdDAGSameFiles, fwdDAGDifferentFiles);
if (LOG.isDebugEnabled()) {
- LOG.debug("Result of diff from src '" + src.dbPath + "' to dest '" +
- dest.dbPath + "':");
+ LOG.debug("Result of diff from src '" + src.getDbPath() + "' to dest '" +
+ dest.getDbPath() + "':");
StringBuilder logSB = new StringBuilder();
logSB.append("Fwd DAG same SST files: ");
@@ -758,17 +752,27 @@ public class RocksDBCheckpointDiffer {
/**
* Core getSSTDiffList logic.
+ * <p>
+ * For each SST in the src snapshot, traverse the DAG to find its final
+ * successors. If any of those successors match an SST in the dest
+ * snapshot, add it to the sameFiles map (as it doesn't need further
+ * diffing). Otherwise, add it to the differentFiles map, as it will
+ * need further diffing.
*/
- private void internalGetSSTDiffList(
+ void internalGetSSTDiffList(
DifferSnapshotInfo src, DifferSnapshotInfo dest,
- HashSet<String> srcSnapFiles, HashSet<String> destSnapFiles,
+ Set<String> srcSnapFiles, Set<String> destSnapFiles,
MutableGraph<CompactionNode> mutableGraph,
- HashSet<String> sameFiles, HashSet<String> differentFiles) {
+ Set<String> sameFiles, Set<String> differentFiles) {
+
+ // Sanity check
+ Preconditions.checkArgument(sameFiles.isEmpty(), "Set must be empty");
+ Preconditions.checkArgument(differentFiles.isEmpty(), "Set must be empty");
for (String fileName : srcSnapFiles) {
if (destSnapFiles.contains(fileName)) {
LOG.debug("Source '{}' and destination '{}' share the same SST '{}'",
- src.dbPath, dest.dbPath, fileName);
+ src.getDbPath(), dest.getDbPath(), fileName);
sameFiles.add(fileName);
continue;
}
@@ -776,7 +780,7 @@ public class RocksDBCheckpointDiffer {
CompactionNode infileNode = compactionNodeMap.get(fileName);
if (infileNode == null) {
LOG.debug("Source '{}' SST file '{}' is never compacted",
- src.dbPath, fileName);
+ src.getDbPath(), fileName);
differentFiles.add(fileName);
continue;
}
@@ -787,19 +791,33 @@ public class RocksDBCheckpointDiffer {
// Traversal level/depth indicator for debug print
int level = 1;
while (!currentLevel.isEmpty()) {
- LOG.debug("BFS level: {}. Current level has {} nodes.",
+ LOG.debug("Traversal level: {}. Current level has {} nodes.",
level++, currentLevel.size());
+ if (level >= 1000000) {
+ final String errorMsg = String.format(
+ "Graph traversal level exceeded allowed maximum (%d). "
+ + "This could be due to invalid input generating a "
+ + "loop in the traversal path. Same SSTs found so far: %s, "
+ + "different SSTs: %s", level, sameFiles, differentFiles);
+ LOG.error(errorMsg);
+ // Clear output in case of error. Expect fall back to full diff
+ sameFiles.clear();
+ differentFiles.clear();
+ // TODO: Revisit error handling here. Use custom exception?
+ throw new RuntimeException(errorMsg);
+ }
+
final Set<CompactionNode> nextLevel = new HashSet<>();
for (CompactionNode current : currentLevel) {
- LOG.debug("Processing node: {}", current.fileName);
- if (current.snapshotGeneration <= dest.snapshotGeneration) {
+ LOG.debug("Processing node: {}", current.getFileName());
+ if (current.getSnapshotGeneration() < dest.getSnapshotGeneration()) {
LOG.debug("Current node's snapshot generation '{}' "
+ "reached destination snapshot's '{}'. "
+ "Src '{}' and dest '{}' have different SST file: '{}'",
- current.snapshotGeneration, dest.snapshotGeneration,
- src.dbPath, dest.dbPath, current.fileName);
- differentFiles.add(current.fileName);
+ current.getSnapshotGeneration(), dest.getSnapshotGeneration(),
+ src.getDbPath(), dest.getDbPath(), current.getFileName());
+ differentFiles.add(current.getFileName());
continue;
}
@@ -807,28 +825,28 @@ public class RocksDBCheckpointDiffer {
if (successors.isEmpty()) {
LOG.debug("No further compaction happened to the current file. " +
"Src '{}' and dest '{}' have different file: {}",
- src.dbPath, dest.dbPath, current.fileName);
- differentFiles.add(current.fileName);
+ src.getDbPath(), dest.getDbPath(), current.getFileName());
+ differentFiles.add(current.getFileName());
continue;
}
for (CompactionNode node : successors) {
- if (sameFiles.contains(node.fileName) ||
- differentFiles.contains(node.fileName)) {
- LOG.debug("Skipping known processed SST: {}", node.fileName);
+ if (sameFiles.contains(node.getFileName()) ||
+ differentFiles.contains(node.getFileName())) {
+ LOG.debug("Skipping known processed SST: {}",
node.getFileName());
continue;
}
- if (destSnapFiles.contains(node.fileName)) {
+ if (destSnapFiles.contains(node.getFileName())) {
LOG.debug("Src '{}' and dest '{}' have the same SST: {}",
- src.dbPath, dest.dbPath, node.fileName);
- sameFiles.add(node.fileName);
+ src.getDbPath(), dest.getDbPath(), node.getFileName());
+ sameFiles.add(node.getFileName());
continue;
}
// Queue different SST to the next level
LOG.debug("Src '{}' and dest '{}' have a different SST: {}",
- src.dbPath, dest.dbPath, node.fileName);
+ src.getDbPath(), dest.getDbPath(), node.getFileName());
nextLevel.add(node);
}
}
@@ -840,7 +858,7 @@ public class RocksDBCheckpointDiffer {
static class NodeComparator
implements Comparator<CompactionNode>, Serializable {
public int compare(CompactionNode a, CompactionNode b) {
- return a.fileName.compareToIgnoreCase(b.fileName);
+ return a.getFileName().compareToIgnoreCase(b.getFileName());
}
@Override
@@ -854,78 +872,11 @@ public class RocksDBCheckpointDiffer {
List<CompactionNode> nodeList = compactionNodeMap.values().stream()
.sorted(new NodeComparator()).collect(Collectors.toList());
for (CompactionNode n : nodeList) {
- LOG.debug("File '{}' total keys: {}", n.fileName, n.totalNumberOfKeys);
- LOG.debug("File '{}' cumulative keys: {}", n.fileName,
- n.cumulativeKeysReverseTraversal);
- }
- }
-
- @VisibleForTesting
- public synchronized void printMutableGraphFromAGivenNode(String fileName,
- int sstLevel, MutableGraph<CompactionNode> mutableGraph) {
-
- CompactionNode infileNode = compactionNodeMap.get(fileName);
- if (infileNode == null) {
- return;
- }
- LOG.debug("Expanding file: {}. SST compaction level: {}",
- fileName, sstLevel);
- Set<CompactionNode> currentLevel = new HashSet<>();
- currentLevel.add(infileNode);
- int levelCounter = 1;
- while (!currentLevel.isEmpty()) {
- LOG.debug("DAG Level: {}", levelCounter++);
- final Set<CompactionNode> nextLevel = new HashSet<>();
- StringBuilder sb = new StringBuilder();
- for (CompactionNode current : currentLevel) {
- Set<CompactionNode> successors = mutableGraph.successors(current);
- for (CompactionNode succNode : successors) {
- sb.append(succNode.fileName).append(" ");
- nextLevel.add(succNode);
- }
- }
- LOG.debug("{}", sb);
- currentLevel = nextLevel;
- }
- }
-
- synchronized void printMutableGraph(String srcSnapId, String destSnapId,
- MutableGraph<CompactionNode> mutableGraph) {
-
- LOG.debug("Gathering all SST file nodes from src '{}' to dest '{}'",
- srcSnapId, destSnapId);
-
- final Queue<CompactionNode> nodeQueue = new LinkedList<>();
- // Queue source snapshot SST file nodes
- for (CompactionNode node : mutableGraph.nodes()) {
- if (srcSnapId == null ||
- node.snapshotId.compareToIgnoreCase(srcSnapId) == 0) {
- nodeQueue.add(node);
- }
- }
-
- final Set<CompactionNode> allNodesSet = new HashSet<>();
- while (!nodeQueue.isEmpty()) {
- CompactionNode node = nodeQueue.poll();
- Set<CompactionNode> succSet = mutableGraph.successors(node);
- LOG.debug("Current node: {}", node);
- if (succSet.isEmpty()) {
- LOG.debug("Has no successor node");
- allNodesSet.add(node);
- continue;
- }
- for (CompactionNode succNode : succSet) {
- LOG.debug("Has successor node: {}", succNode);
- if (srcSnapId == null ||
- succNode.snapshotId.compareToIgnoreCase(destSnapId) == 0) {
- allNodesSet.add(succNode);
- continue;
- }
- nodeQueue.add(succNode);
- }
+ LOG.debug("File '{}' total keys: {}",
+ n.getFileName(), n.getTotalNumberOfKeys());
+ LOG.debug("File '{}' cumulative keys: {}",
+ n.getFileName(), n.getCumulativeKeysReverseTraversal());
}
-
- LOG.debug("Files are: {}", allNodesSet);
}
public MutableGraph<CompactionNode> getForwardCompactionDAG() {
@@ -940,14 +891,16 @@ public class RocksDBCheckpointDiffer {
* Helper method to add a new file node to the DAG.
* @return CompactionNode
*/
- private CompactionNode addNodeToDAG(String file, long seqNum) {
+ private CompactionNode addNodeToDAG(String file, String snapshotID,
+ long seqNum) {
long numKeys = 0L;
try {
numKeys = getSSTFileSummary(file);
} catch (RocksDBException e) {
LOG.warn("Can't get num of keys in SST '{}': {}", file, e.getMessage());
}
- CompactionNode fileNode = new CompactionNode(file, null, numKeys, seqNum);
+ CompactionNode fileNode = new CompactionNode(
+ file, snapshotID, numKeys, seqNum);
forwardCompactionDAG.addNode(fileNode);
backwardCompactionDAG.addNode(fileNode);
@@ -956,9 +909,14 @@ public class RocksDBCheckpointDiffer {
/**
* Populate the compaction DAG with input and output SST files lists.
+ * @param inputFiles List of compaction input files.
+ * @param outputFiles List of compaction output files.
+ * @param snapshotId Snapshot ID for debugging purpose. In fact, this can be
+ * arbitrary String as long as it helps debugging.
+ * @param seqNum DB transaction sequence number.
*/
private void populateCompactionDAG(List<String> inputFiles,
- List<String> outputFiles, long seqNum) {
+ List<String> outputFiles, String snapshotId, long seqNum) {
if (LOG.isDebugEnabled()) {
LOG.debug("Input files: {} -> Output files: {}", inputFiles,
outputFiles);
@@ -966,13 +924,13 @@ public class RocksDBCheckpointDiffer {
for (String outfile : outputFiles) {
final CompactionNode outfileNode = compactionNodeMap.computeIfAbsent(
- outfile, file -> addNodeToDAG(file, seqNum));
+ outfile, file -> addNodeToDAG(file, snapshotId, seqNum));
for (String infile : inputFiles) {
final CompactionNode infileNode = compactionNodeMap.computeIfAbsent(
- infile, file -> addNodeToDAG(file, seqNum));
+ infile, file -> addNodeToDAG(file, snapshotId, seqNum));
// Draw the edges
- if (!outfileNode.fileName.equals(infileNode.fileName)) {
+ if (!outfileNode.getFileName().equals(infileNode.getFileName())) {
forwardCompactionDAG.putEdge(outfileNode, infileNode);
backwardCompactionDAG.putEdge(infileNode, outfileNode);
}
@@ -981,65 +939,6 @@ public class RocksDBCheckpointDiffer {
}
- @VisibleForTesting
- public synchronized void traverseGraph(
- MutableGraph<CompactionNode> reverseMutableGraph,
- MutableGraph<CompactionNode> fwdMutableGraph) {
-
- List<CompactionNode> nodeList = compactionNodeMap.values().stream()
- .sorted(new NodeComparator()).collect(Collectors.toList());
-
- for (CompactionNode infileNode : nodeList) {
- // fist go through fwdGraph to find nodes that don't have successors.
- // These nodes will be the top level nodes in reverse graph
- Set<CompactionNode> successors = fwdMutableGraph.successors(infileNode);
- if (successors.size() == 0) {
- LOG.debug("No successors. Cumulative keys: {}, total keys: {}",
- infileNode.cumulativeKeysReverseTraversal,
- infileNode.totalNumberOfKeys);
- infileNode.cumulativeKeysReverseTraversal =
- infileNode.totalNumberOfKeys;
- }
- }
-
- HashSet<CompactionNode> visited = new HashSet<>();
- for (CompactionNode infileNode : nodeList) {
- if (visited.contains(infileNode)) {
- continue;
- }
- visited.add(infileNode);
- LOG.debug("Visiting node '{}'", infileNode.fileName);
- Set<CompactionNode> currentLevel = new HashSet<>();
- currentLevel.add(infileNode);
- int level = 1;
- while (!currentLevel.isEmpty()) {
- LOG.debug("BFS Level: {}. Current level has {} nodes",
- level++, currentLevel.size());
- final Set<CompactionNode> nextLevel = new HashSet<>();
- for (CompactionNode current : currentLevel) {
- LOG.debug("Expanding node: {}", current.fileName);
- Set<CompactionNode> successors =
- reverseMutableGraph.successors(current);
- if (successors.isEmpty()) {
- LOG.debug("No successors. Cumulative keys: {}",
- current.cumulativeKeysReverseTraversal);
- continue;
- }
- for (CompactionNode node : successors) {
- LOG.debug("Adding to the next level: {}", node.fileName);
- LOG.debug("'{}' cumulative keys: {}. parent '{}' total keys: {}",
- node.fileName, node.cumulativeKeysReverseTraversal,
- current.fileName, current.totalNumberOfKeys);
- node.cumulativeKeysReverseTraversal +=
- current.cumulativeKeysReverseTraversal;
- nextLevel.add(node);
- }
- }
- currentLevel = nextLevel;
- }
- }
- }
-
@VisibleForTesting
public boolean debugEnabled(Integer level) {
return DEBUG_LEVEL.contains(level);
@@ -1050,4 +949,9 @@ public class RocksDBCheckpointDiffer {
return LOG;
}
+ @VisibleForTesting
+ public ConcurrentHashMap<String, CompactionNode> getCompactionNodeMap() {
+ return compactionNodeMap;
+ }
+
}
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
index aa19220042..590e740575 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
@@ -26,17 +26,30 @@ import static org.junit.jupiter.api.Assertions.fail;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Queue;
import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
-import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DifferSnapshotInfo;
+import com.google.common.graph.MutableGraph;
+import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.NodeComparator;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.rocksdb.Checkpoint;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
@@ -90,8 +103,167 @@ public class TestRocksDBCheckpointDiffer {
GenericTestUtils.setLogLevel(TestRocksDBCheckpointDiffer.LOG, Level.INFO);
}
+ /**
+ * Test cases for testGetSSTDiffListWithoutDB.
+ */
+ private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
+
+ DifferSnapshotInfo snapshotInfo1 = new DifferSnapshotInfo(
+ "/path/to/dbcp1", "ssUUID1", 3008L, null);
+ DifferSnapshotInfo snapshotInfo2 = new DifferSnapshotInfo(
+ "/path/to/dbcp2", "ssUUID2", 14980L, null);
+ DifferSnapshotInfo snapshotInfo3 = new DifferSnapshotInfo(
+ "/path/to/dbcp3", "ssUUID3", 17975L, null);
+ DifferSnapshotInfo snapshotInfo4 = new DifferSnapshotInfo(
+ "/path/to/dbcp4", "ssUUID4", 18000L, null);
+
+ Set<String> snapshotSstFiles1 = new HashSet<>(asList(
+ "000059", "000053"));
+ Set<String> snapshotSstFiles2 = new HashSet<>(asList(
+ "000088", "000059", "000053", "000095"));
+ Set<String> snapshotSstFiles3 = new HashSet<>(asList(
+ "000088", "000105", "000059", "000053", "000095"));
+ Set<String> snapshotSstFiles4 = new HashSet<>(asList(
+ "000088", "000105", "000059", "000053", "000095", "000108"));
+ Set<String> snapshotSstFiles1Alt1 = new HashSet<>(asList(
+ "000059", "000053", "000066"));
+ Set<String> snapshotSstFiles1Alt2 = new HashSet<>(asList(
+ "000059", "000053", "000052"));
+ Set<String> snapshotSstFiles2Alt2 = new HashSet<>(asList(
+ "000088", "000059", "000053", "000095", "000099"));
+ Set<String> snapshotSstFiles2Alt3 = new HashSet<>(asList(
+ "000088", "000059", "000053", "000062"));
+
+ return Stream.of(
+ Arguments.of("Test 1: Regular case. Expands expandable " +
+ "SSTs in the initial diff.",
+ snapshotInfo3,
+ snapshotInfo1,
+ snapshotSstFiles3,
+ snapshotSstFiles1,
+ new HashSet<>(asList("000059", "000053")),
+ new HashSet<>(asList(
+ "000066", "000105", "000080", "000087", "000073", "000095")),
+ false),
+ Arguments.of("Test 2: Crafted input: One source " +
+ "('to' snapshot) SST file is never compacted (newly flushed)",
+ snapshotInfo4,
+ snapshotInfo3,
+ snapshotSstFiles4,
+ snapshotSstFiles3,
+ new HashSet<>(asList(
+ "000088", "000105", "000059", "000053", "000095")),
+ new HashSet<>(asList("000108")),
+ false),
+ Arguments.of("Test 3: Crafted input: Same SST files " +
+ "found during SST expansion",
+ snapshotInfo2,
+ snapshotInfo1,
+ snapshotSstFiles2,
+ snapshotSstFiles1Alt1,
+ new HashSet<>(asList("000066", "000059", "000053")),
+ new HashSet<>(asList(
+ "000080", "000087", "000073", "000095")),
+ false),
+ Arguments.of("Test 4: Crafted input: Skipping known " +
+ "processed SST.",
+ snapshotInfo2,
+ snapshotInfo1,
+ snapshotSstFiles2Alt2,
+ snapshotSstFiles1Alt2,
+ new HashSet<>(),
+ new HashSet<>(),
+ true),
+ Arguments.of("Test 5: Hit snapshot generation early exit " +
+ "condition",
+ snapshotInfo2,
+ snapshotInfo1,
+ snapshotSstFiles2Alt3,
+ snapshotSstFiles1,
+ new HashSet<>(asList("000059", "000053")),
+ new HashSet<>(asList(
+ "000066", "000080", "000087", "000073", "000062")),
+ false)
+ );
+ }
+
+ /**
+ * Tests core SST diff list logic. Does not involve DB.
+ * Focuses on testing edge cases in internalGetSSTDiffList().
+ */
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("casesGetSSTDiffListWithoutDB")
+ @SuppressWarnings("parameternumber")
+ public void testGetSSTDiffListWithoutDB(String description,
+ DifferSnapshotInfo srcSnapshot,
+ DifferSnapshotInfo destSnapshot,
+ Set<String> srcSnapshotSstFiles,
+ Set<String> destSnapshotSstFiles,
+ Set<String> expectedSameSstFiles,
+ Set<String> expectedDiffSstFiles,
+ boolean expectingException) {
+
+ RocksDBCheckpointDiffer differ = new RocksDBCheckpointDiffer();
+ boolean exceptionThrown = false;
+
+ String compactionLog = ""
+ + "S 1000 df6410c7-151b-4e90-870e-5ef12875acd5\n" // Snapshot 0
+ + "C 000001,000002:000062\n"
+ // Additional "compaction" to trigger and test early exit condition
+ + "S 3008 ef6410c7-151b-4e90-870e-5ef12875acd5\n" // Snapshot 1
+ + "C 000068,000062:000069\n" // Regular compaction
+ + "C 000071,000064,000060,000052:000071,000064,000060,000052\n"
+ // Trivial move
+ + "C 000073,000066:000074\n"
+ + "C 000082,000076,000069:000083\n"
+ + "C 000087,000080,000074:000088\n"
+ + "C 000093,000090,000083:\n" // Deletion?
+ + "S 14980 e7ad72f8-52df-4430-93f6-0ee91d4a47fd\n" // Snapshot 2
+ + "C 000098,000096,000085,000078,000071,000064,000060,000052:000099\n"
+ + "C 000105,000095,000088:000107\n"
+ + "S 17975 4f084f6e-ed3d-4780-8362-f832303309ea\n"; // Snapshot 3
+
+ // Construct DAG from compaction log input
+ Arrays.stream(compactionLog.split("\n")).forEach(
+ differ::processCompactionLogLine);
+
+ Set<String> actualSameSstFiles = new HashSet<>();
+ Set<String> actualDiffSstFiles = new HashSet<>();
+
+ try {
+ differ.internalGetSSTDiffList(
+ srcSnapshot,
+ destSnapshot,
+ srcSnapshotSstFiles,
+ destSnapshotSstFiles,
+ differ.getForwardCompactionDAG(),
+ actualSameSstFiles,
+ actualDiffSstFiles);
+ } catch (RuntimeException rtEx) {
+ if (!expectingException) {
+ fail("Unexpected exception thrown in test.");
+ } else {
+ exceptionThrown = true;
+ }
+ }
+
+ // Check same and different SST files result
+ Assertions.assertEquals(expectedSameSstFiles, actualSameSstFiles);
+ Assertions.assertEquals(expectedDiffSstFiles, actualDiffSstFiles);
+
+ if (expectingException && !exceptionThrown) {
+ fail("Expecting exception but none thrown.");
+ }
+ }
+
+ /**
+ * Tests DB listener (compaction log generation, SST backup),
+ * SST file list diff.
+ * <p>
+ * Does actual DB write, flush, compaction.
+ */
@Test
- void testMain() throws Exception {
+ void testDifferWithDB() throws Exception {
final String clDirStr = "compaction-log";
// Delete the compaction log dir for the test, if it exists
@@ -121,12 +293,21 @@ public class TestRocksDBCheckpointDiffer {
printAllSnapshots();
}
- differ.traverseGraph(
+ traverseGraph(differ.getCompactionNodeMap(),
differ.getBackwardCompactionDAG(),
differ.getForwardCompactionDAG());
diffAllSnapshots(differ);
+ // Confirm correct links created
+ try (Stream<Path> sstPathStream = Files.list(sstDir.toPath())) {
+ List<String> expectedLinks = sstPathStream.map(Path::getFileName)
+ .map(Object::toString).sorted().collect(Collectors.toList());
+ Assertions.assertEquals(expectedLinks, asList(
+ "000015.sst", "000017.sst", "000019.sst", "000021.sst",
+ "000022.sst", "000024.sst", "000026.sst"));
+ }
+
if (LOG.isDebugEnabled()) {
differ.dumpCompactionNodeTable();
}
@@ -215,7 +396,8 @@ public class TestRocksDBCheckpointDiffer {
this.snapshots.add(currentSnapshot);
// Same as what OmSnapshotManager#createOmSnapshotCheckpoint would do
- differ.appendSequenceNumberToCompactionLog(dbLatestSequenceNumber);
+ differ.appendSequenceNumberToCompactionLog(
+ dbLatestSequenceNumber, snapshotId);
differ.setCurrentCompactionLog(dbLatestSequenceNumber);
@@ -300,158 +482,6 @@ public class TestRocksDBCheckpointDiffer {
return directoryToBeDeleted.delete();
}
- /**
- * RocksDB.DEFAULT_COLUMN_FAMILY.
- */
- private void updateRocksDBInstance(String dbPathArg, RocksDB rocksDB) {
- System.out.println("Updating RocksDB instance at :" + dbPathArg);
-
- try (Options options = new Options().setCreateIfMissing(true)) {
- if (rocksDB == null) {
- rocksDB = RocksDB.open(options, dbPathArg);
- }
-
- Random random = new Random();
- // key-value
- for (int i = 0; i < NUM_ROW; ++i) {
- String generatedString = getRandomString(random, 7);
- String keyStr = " MyUpdated" + generatedString + "StringKey" + i;
- String valueStr = " My Updated" + generatedString + "StringValue" + i;
- byte[] key = keyStr.getBytes(UTF_8);
- rocksDB.put(key, valueStr.getBytes(UTF_8));
- System.out.println(toStr(rocksDB.get(key)));
- }
- } catch (RocksDBException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * RocksDB.DEFAULT_COLUMN_FAMILY.
- */
- public void testDefaultColumnFamilyOriginal() {
- System.out.println("testDefaultColumnFamily begin...");
-
- try (Options options = new Options().setCreateIfMissing(true)) {
- try (RocksDB rocksDB = RocksDB.open(options, "./rocksdb-data")) {
- // key-value
- byte[] key = "Hello".getBytes(UTF_8);
- rocksDB.put(key, "World".getBytes(UTF_8));
-
- System.out.println(toStr(rocksDB.get(key)));
-
- rocksDB.put("SecondKey".getBytes(UTF_8),
"SecondValue".getBytes(UTF_8));
-
- // List
- List<byte[]> keys = asList(key, "SecondKey".getBytes(UTF_8),
- "missKey".getBytes(UTF_8));
- List<byte[]> values = rocksDB.multiGetAsList(keys);
- for (int i = 0; i < keys.size(); i++) {
- System.out.println("multiGet " + toStr(keys.get(i)) + ":" +
- (values.get(i) != null ? toStr(values.get(i)) : null));
- }
-
- // [key - value]
- RocksIterator iter = rocksDB.newIterator();
- for (iter.seekToFirst(); iter.isValid(); iter.next()) {
- System.out.println("iterator key:" + toStr(iter.key()) + ", " +
- "iter value:" + toStr(iter.value()));
- }
-
- // key
- rocksDB.delete(key);
- System.out.println("after remove key:" + toStr(key));
-
- iter = rocksDB.newIterator();
- for (iter.seekToFirst(); iter.isValid(); iter.next()) {
- System.out.println("iterator key:" + toStr(iter.key()) + ", " +
- "iter value:" + toStr(iter.value()));
- }
- }
- } catch (RocksDBException e) {
- e.printStackTrace();
- }
- }
-
- // (table)
- public void testCertainColumnFamily() {
- System.out.println("\ntestCertainColumnFamily begin...");
- try (ColumnFamilyOptions cfOpts = new ColumnFamilyOptions()
- .optimizeUniversalStyleCompaction()) {
- String cfName = "my-first-columnfamily";
- // list of column family descriptors, first entry must always be
- // default column family
- final List<ColumnFamilyDescriptor> cfDescriptors = asList(
- new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOpts),
- new ColumnFamilyDescriptor(cfName.getBytes(UTF_8), cfOpts)
- );
-
- List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
- try (DBOptions dbOptions = new DBOptions()
- .setCreateIfMissing(true)
- .setCreateMissingColumnFamilies(true);
- RocksDB rocksDB = RocksDB.open(dbOptions,
- "./rocksdb-data-cf/", cfDescriptors, cfHandles)) {
- ColumnFamilyHandle cfHandle = cfHandles.stream().filter(x -> {
- try {
- return (toStr(x.getName())).equals(cfName);
- } catch (RocksDBException e) {
- return false;
- }
- }).collect(Collectors.toList()).get(0);
-
- // key/value
- String key = "FirstKey";
- rocksDB.put(cfHandles.get(0), key.getBytes(UTF_8),
- "FirstValue".getBytes(UTF_8));
- // key
- byte[] getValue = rocksDB.get(cfHandles.get(0), key.getBytes(UTF_8));
- LOG.debug("get Value: " + toStr(getValue));
- // key/value
- rocksDB.put(cfHandles.get(1), "SecondKey".getBytes(UTF_8),
- "SecondValue".getBytes(UTF_8));
-
- List<byte[]> keys = asList(key.getBytes(UTF_8),
- "SecondKey".getBytes(UTF_8));
- List<ColumnFamilyHandle> cfHandleList = asList(cfHandle, cfHandle);
-
- // key
- List<byte[]> values = rocksDB.multiGetAsList(cfHandleList, keys);
- for (int i = 0; i < keys.size(); i++) {
- LOG.debug("multiGet:" + toStr(keys.get(i)) + "--" +
- (values.get(i) == null ? null : toStr(values.get(i))));
- }
-
- List<LiveFileMetaData> liveFileMetaDataList =
- rocksDB.getLiveFilesMetaData();
- for (LiveFileMetaData m : liveFileMetaDataList) {
- System.out.println("Live File Metadata");
- System.out.println("\tFile :" + m.fileName());
- System.out.println("\tTable :" + toStr(m.columnFamilyName()));
- System.out.println("\tKey Range :" + toStr(m.smallestKey()) +
- " " + "<->" + toStr(m.largestKey()));
- }
-
- // key
- rocksDB.delete(cfHandle, key.getBytes(UTF_8));
-
- // key
- RocksIterator iter = rocksDB.newIterator(cfHandle);
- for (iter.seekToFirst(); iter.isValid(); iter.next()) {
- System.out.println("Iterator:" + toStr(iter.key()) + ":" +
- toStr(iter.value()));
- }
- } finally {
- // NOTE frees the column family handles before freeing the db
- for (final ColumnFamilyHandle cfHandle : cfHandles) {
- cfHandle.close();
- }
- }
- } catch (RocksDBException e) {
- e.printStackTrace();
- } // frees the column family options
- }
-
// Read from a given RocksDB instance and optionally write all the
// keys to a given file.
void readRocksDBInstance(String dbPathArg, RocksDB rocksDB, FileWriter file,
@@ -478,7 +508,9 @@ public class TestRocksDBCheckpointDiffer {
LOG.debug("\tKey Range: {}", toStr(m.smallestKey())
+ " <-> " + toStr(m.largestKey()));
if (differ.debugEnabled(DEBUG_DAG_LIVE_NODES)) {
- differ.printMutableGraphFromAGivenNode(m.fileName(), m.level(),
+ printMutableGraphFromAGivenNode(
+ differ.getCompactionNodeMap(),
+ m.fileName(), m.level(),
differ.getForwardCompactionDAG());
}
}
@@ -510,4 +542,140 @@ public class TestRocksDBCheckpointDiffer {
private String toStr(byte[] bytes) {
return new String(bytes, UTF_8);
}
+
+ /**
+ * Helper that traverses the graphs for testing.
+ * @param compactionNodeMap
+ * @param reverseMutableGraph
+ * @param fwdMutableGraph
+ */
+ private void traverseGraph(
+ ConcurrentHashMap<String, CompactionNode> compactionNodeMap,
+ MutableGraph<CompactionNode> reverseMutableGraph,
+ MutableGraph<CompactionNode> fwdMutableGraph) {
+
+ List<CompactionNode> nodeList = compactionNodeMap.values().stream()
+ .sorted(new NodeComparator()).collect(Collectors.toList());
+
+ for (CompactionNode infileNode : nodeList) {
+ // fist go through fwdGraph to find nodes that don't have successors.
+ // These nodes will be the top level nodes in reverse graph
+ Set<CompactionNode> successors = fwdMutableGraph.successors(infileNode);
+ if (successors.size() == 0) {
+ LOG.debug("No successors. Cumulative keys: {}, total keys: {}",
+ infileNode.getCumulativeKeysReverseTraversal(),
+ infileNode.getTotalNumberOfKeys());
+ infileNode.setCumulativeKeysReverseTraversal(
+ infileNode.getTotalNumberOfKeys());
+ }
+ }
+
+ Set<CompactionNode> visited = new HashSet<>();
+
+ for (CompactionNode infileNode : nodeList) {
+ if (visited.contains(infileNode)) {
+ continue;
+ }
+ visited.add(infileNode);
+ LOG.debug("Visiting node '{}'", infileNode.getFileName());
+ Set<CompactionNode> currentLevel = new HashSet<>();
+ currentLevel.add(infileNode);
+ int level = 1;
+ while (!currentLevel.isEmpty()) {
+ LOG.debug("BFS Level: {}. Current level has {} nodes",
+ level++, currentLevel.size());
+ final Set<CompactionNode> nextLevel = new HashSet<>();
+ for (CompactionNode current : currentLevel) {
+ LOG.debug("Expanding node: {}", current.getFileName());
+ Set<CompactionNode> successors =
+ reverseMutableGraph.successors(current);
+ if (successors.isEmpty()) {
+ LOG.debug("No successors. Cumulative keys: {}",
+ current.getCumulativeKeysReverseTraversal());
+ continue;
+ }
+ for (CompactionNode node : successors) {
+ LOG.debug("Adding to the next level: {}", node.getFileName());
+ LOG.debug("'{}' cumulative keys: {}. parent '{}' total keys: {}",
+ node.getFileName(), node.getCumulativeKeysReverseTraversal(),
+ current.getFileName(), current.getTotalNumberOfKeys());
+ node.addCumulativeKeysReverseTraversal(
+ current.getCumulativeKeysReverseTraversal());
+ nextLevel.add(node);
+ }
+ }
+ currentLevel = nextLevel;
+ }
+ }
+ }
+
+ private void printMutableGraphFromAGivenNode(
+ ConcurrentHashMap<String, CompactionNode> compactionNodeMap,
+ String fileName,
+ int sstLevel,
+ MutableGraph<CompactionNode> mutableGraph) {
+
+ CompactionNode infileNode = compactionNodeMap.get(fileName);
+ if (infileNode == null) {
+ return;
+ }
+ LOG.debug("Expanding file: {}. SST compaction level: {}",
+ fileName, sstLevel);
+ Set<CompactionNode> currentLevel = new HashSet<>();
+ currentLevel.add(infileNode);
+ int levelCounter = 1;
+ while (!currentLevel.isEmpty()) {
+ LOG.debug("DAG Level: {}", levelCounter++);
+ final Set<CompactionNode> nextLevel = new HashSet<>();
+ StringBuilder sb = new StringBuilder();
+ for (CompactionNode current : currentLevel) {
+ Set<CompactionNode> successors = mutableGraph.successors(current);
+ for (CompactionNode succNode : successors) {
+ sb.append(succNode.getFileName()).append(" ");
+ nextLevel.add(succNode);
+ }
+ }
+ LOG.debug("{}", sb);
+ currentLevel = nextLevel;
+ }
+ }
+
+ private void printMutableGraph(String srcSnapId, String destSnapId,
+ MutableGraph<CompactionNode> mutableGraph) {
+
+ LOG.debug("Gathering all SST file nodes from src '{}' to dest '{}'",
+ srcSnapId, destSnapId);
+
+ final Queue<CompactionNode> nodeQueue = new LinkedList<>();
+ // Queue source snapshot SST file nodes
+ for (CompactionNode node : mutableGraph.nodes()) {
+ if (srcSnapId == null ||
+ node.getSnapshotId().compareToIgnoreCase(srcSnapId) == 0) {
+ nodeQueue.add(node);
+ }
+ }
+
+ final Set<CompactionNode> allNodesSet = new HashSet<>();
+ while (!nodeQueue.isEmpty()) {
+ CompactionNode node = nodeQueue.poll();
+ Set<CompactionNode> succSet = mutableGraph.successors(node);
+ LOG.debug("Current node: {}", node);
+ if (succSet.isEmpty()) {
+ LOG.debug("Has no successor node");
+ allNodesSet.add(node);
+ continue;
+ }
+ for (CompactionNode succNode : succSet) {
+ LOG.debug("Has successor node: {}", succNode);
+ if (srcSnapId == null ||
+ succNode.getSnapshotId().compareToIgnoreCase(destSnapId) == 0) {
+ allNodesSet.add(succNode);
+ continue;
+ }
+ nodeQueue.add(succNode);
+ }
+ }
+
+ LOG.debug("Files are: {}", allNodesSet);
+ }
}
\ No newline at end of file
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
index de02528e8a..c89f20c8c9 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneVolume;
@@ -32,8 +33,8 @@ import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.ozone.rocksdiff.DifferSnapshotInfo;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
-import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DifferSnapshotInfo;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.raftlog.RaftLog;
@@ -49,7 +50,6 @@ import picocli.CommandLine;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -77,7 +77,6 @@ public class TestOMSnapshotDAG {
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true
- *
*/
@BeforeAll
public static void init() throws Exception {
@@ -94,7 +93,12 @@ public class TestOMSnapshotDAG {
raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(3));
conf.setFromObject(raftClientConfig);
- cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build();
+ // Set DB CF write buffer to a much lower value so that flush and
compaction
+ // happens much more frequently without having to create a lot of keys.
+ conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_CF_WRITE_BUFFER_SIZE,
+ "256KB");
+
+ cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
cluster.waitForClusterToBeReady();
store = cluster.getClient().getObjectStore();
@@ -160,28 +164,29 @@ public class TestOMSnapshotDAG {
}
@Test
- void testZeroSizeKey()
- throws IOException, InterruptedException, TimeoutException {
+ public void testDAGReconstruction()
+ throws IOException, InterruptedException, TimeoutException {
+ // Generate keys
RandomKeyGenerator randomKeyGenerator =
new RandomKeyGenerator(cluster.getConf());
CommandLine cmd = new CommandLine(randomKeyGenerator);
cmd.execute("--num-of-volumes", "1",
"--num-of-buckets", "1",
- "--num-of-keys", "600",
+ "--num-of-keys", "500",
"--num-of-threads", "1",
- "--key-size", "0",
+ "--key-size", "0", // zero size keys. since we don't need to test DNs
"--factor", "THREE",
"--type", "RATIS",
"--validate-writes"
);
- Assertions.assertEquals(600L, randomKeyGenerator.getNumberOfKeysAdded());
- Assertions.assertEquals(600L,
+ Assertions.assertEquals(500L, randomKeyGenerator.getNumberOfKeysAdded());
+ Assertions.assertEquals(500L,
randomKeyGenerator.getSuccessfulValidationCount());
List<OmVolumeArgs> volList = cluster.getOzoneManager()
- .listAllVolumes("", "", 10);
+ .listAllVolumes("", "", 2);
LOG.debug("List of all volumes: {}", volList);
final String volumeName = volList.stream().filter(e ->
!e.getVolume().equals(OZONE_S3_VOLUME_NAME_DEFAULT)) // Ignore s3v vol
@@ -198,12 +203,12 @@ public class TestOMSnapshotDAG {
final OzoneVolume volume = store.getVolume(volumeName);
final OzoneBucket bucket = volume.getBucket(bucketName);
- for (int i = 0; i < 6000; i++) {
+ for (int i = 0; i < 2000; i++) {
bucket.createKey("b_" + i, 0).close();
}
// Create another snapshot
- resp = store.createSnapshot(volumeName, bucketName, "snap3");
+ resp = store.createSnapshot(volumeName, bucketName, "snap2");
LOG.debug("Snapshot created: {}", resp);
// Get snapshot SST diff list
@@ -214,27 +219,51 @@ public class TestOMSnapshotDAG {
DifferSnapshotInfo snap1 = getDifferSnapshotInfo(omMetadataManager,
volumeName, bucketName, "snap1");
- DifferSnapshotInfo snap3 = getDifferSnapshotInfo(omMetadataManager,
- volumeName, bucketName, "snap3");
+ DifferSnapshotInfo snap2 = getDifferSnapshotInfo(omMetadataManager,
+ volumeName, bucketName, "snap2");
// RocksDB does checkpointing in a separate thread, wait for it
final File checkpointSnap1 = new File(snap1.getDbPath());
GenericTestUtils.waitFor(checkpointSnap1::exists, 2000, 20000);
+ final File checkpointSnap2 = new File(snap2.getDbPath());
+ GenericTestUtils.waitFor(checkpointSnap2::exists, 2000, 20000);
+
+ List<String> sstDiffList21 = differ.getSSTDiffList(snap2, snap1);
+ LOG.debug("Got diff list: {}", sstDiffList21);
+
+ // Delete 1000 keys, take a 3rd snapshot, and do another diff
+ for (int i = 0; i < 1000; i++) {
+ bucket.deleteKey("b_" + i);
+ }
+
+ resp = store.createSnapshot(volumeName, bucketName, "snap3");
+ LOG.debug("Snapshot created: {}", resp);
+
+ DifferSnapshotInfo snap3 = getDifferSnapshotInfo(omMetadataManager,
+ volumeName, bucketName, "snap3");
final File checkpointSnap3 = new File(snap3.getDbPath());
GenericTestUtils.waitFor(checkpointSnap3::exists, 2000, 20000);
- List<String> actualDiffList = differ.getSSTDiffList(snap3, snap1);
- LOG.debug("Got diff list: {}", actualDiffList);
- // Hard-coded expected output.
- // The result is deterministic. Retrieved from a successful run.
- final List<String> expectedDiffList = Collections.singletonList("000059");
- Assertions.assertEquals(expectedDiffList, actualDiffList);
-
- // TODO: Use smaller DB write buffer size (currently it is set to 128 MB
- // in DBProfile), or generate enough keys (in the millions) to trigger
- // RDB compaction. Take another snapshot and do the diff again.
- // Then restart OM, do the same diff again to see if DAG reconstruction
- // works.
+ List<String> sstDiffList32 = differ.getSSTDiffList(snap3, snap2);
+
+ // snap3-snap1 diff result is a combination of snap3-snap2 and snap2-snap1
+ List<String> sstDiffList31 = differ.getSSTDiffList(snap3, snap1);
+
+ // Same snapshot. Result should be empty list
+ List<String> sstDiffList22 = differ.getSSTDiffList(snap2, snap2);
+ Assertions.assertTrue(sstDiffList22.isEmpty());
+
+ // Test DAG reconstruction by restarting OM. Then do the same diffs again
+ cluster.restartOzoneManager();
+
+ List<String> sstDiffList21Run2 = differ.getSSTDiffList(snap2, snap1);
+ Assertions.assertEquals(sstDiffList21, sstDiffList21Run2);
+
+ List<String> sstDiffList32Run2 = differ.getSSTDiffList(snap3, snap2);
+ Assertions.assertEquals(sstDiffList32, sstDiffList32Run2);
+
+ List<String> sstDiffList31Run2 = differ.getSSTDiffList(snap3, snap1);
+ Assertions.assertEquals(sstDiffList31, sstDiffList31Run2);
}
-}
\ No newline at end of file
+}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index a7fa062909..45bdafee7d 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -150,7 +150,8 @@ public final class OmSnapshotManager {
// Write snapshot generation (latest sequence number) to compaction log.
// This will be used for DAG reconstruction as snapshotGeneration.
- dbCpDiffer.appendSequenceNumberToCompactionLog(dbLatestSequenceNumber);
+ dbCpDiffer.appendSequenceNumberToCompactionLog(dbLatestSequenceNumber,
+ snapshotInfo.getSnapshotID());
// Set compaction log filename to the latest DB sequence number
// right after taking the RocksDB checkpoint for Ozone snapshot.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]