This is an automated email from the ASF dual-hosted git repository.
siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 8f745fcc96 HDDS-8137. [Snapshot] SnapDiff to use tombstone entries in
SST files (#4376)
8f745fcc96 is described below
commit 8f745fcc961fc72b88005cbf669cfd503f8d3d93
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Tue Apr 18 13:19:22 2023 -0700
HDDS-8137. [Snapshot] SnapDiff to use tombstone entries in SST files (#4376)
---
.../common/src/main/resources/ozone-default.xml | 18 ++
.../utils/db/managed/ManagedSSTDumpIterator.java | 105 ++++++-----
hadoop-hdds/rocksdb-checkpoint-differ/pom.xml | 4 +
.../ozone/rocksdb/util/ManagedSstFileReader.java | 186 ++++++++++++++++----
.../org/apache/hadoop/ozone/om/OMConfigKeys.java | 11 ++
hadoop-ozone/ozone-manager/pom.xml | 4 +
.../ozone/om/snapshot/SnapshotDiffManager.java | 193 ++++++++++++++-------
7 files changed, 386 insertions(+), 135 deletions(-)
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 018ff80667..f8f908b909 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3656,4 +3656,22 @@
without using the optimised DAG based pruning approach
</description>
</property>
+
+ <property>
+ <name>ozone.om.snapshot.sst_dumptool.pool.size</name>
+ <value>1</value>
+ <tag>OZONE, OM</tag>
+ <description>
+ Threadpool size for SST Dumptool which would be used for computing
snapdiff when native library is enabled.
+ </description>
+ </property>
+
+ <property>
+ <name>ozone.om.snapshot.sst_dumptool.buffer.size</name>
+ <value>8KB</value>
+ <tag>OZONE, OM</tag>
+ <description>
+ Buffer size for SST Dumptool Pipe which would be used for computing
snapdiff when native library is enabled.
+ </description>
+ </property>
</configuration>
diff --git
a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java
b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java
index 35aaeb33b0..a083a2b988 100644
---
a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java
+++
b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java
@@ -17,34 +17,39 @@
package org.apache.hadoop.hdds.utils.db.managed;
+import org.apache.hadoop.util.ClosableIterator;
import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException;
import org.eclipse.jetty.io.RuntimeIOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
-import java.util.Iterator;
+import java.util.Arrays;
+import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
/**
* Iterator to Parse output of RocksDBSSTDumpTool.
*/
-public class ManagedSSTDumpIterator implements
- Iterator<ManagedSSTDumpIterator.KeyValue>, AutoCloseable {
- private static final String SST_DUMP_TOOL_CLASS =
- "org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpTool";
+public abstract class ManagedSSTDumpIterator<T> implements ClosableIterator<T>
{
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ManagedSSTDumpIterator.class);
private static final String PATTERN_REGEX =
- "'([^=>]+)' seq:([0-9]+), type:([0-9]+) => ";
+ "'([^=>]+)' seq:([0-9]+), type:([0-9]+) => ";
public static final int PATTERN_KEY_GROUP_NUMBER = 1;
public static final int PATTERN_SEQ_GROUP_NUMBER = 2;
public static final int PATTERN_TYPE_GROUP_NUMBER = 3;
- private static final Pattern PATTERN_MATCHER =
- Pattern.compile(PATTERN_REGEX);
+ private static final Pattern PATTERN_MATCHER =
Pattern.compile(PATTERN_REGEX);
private BufferedReader processOutput;
private StringBuilder stdoutString;
@@ -56,32 +61,33 @@ public class ManagedSSTDumpIterator implements
private ManagedSSTDumpTool.SSTDumpToolTask sstDumpToolTask;
private AtomicBoolean open;
+ private StackTraceElement[] stackTrace;
public ManagedSSTDumpIterator(ManagedSSTDumpTool sstDumpTool,
- String sstFilePath,
- ManagedOptions options) throws IOException,
- NativeLibraryNotLoadedException {
+ String sstFilePath, ManagedOptions options)
+ throws IOException, NativeLibraryNotLoadedException {
File sstFile = new File(sstFilePath);
if (!sstFile.exists()) {
throw new IOException(String.format("File in path : %s doesn't exist",
- sstFile.getAbsolutePath()));
+ sstFile.getAbsolutePath()));
}
if (!sstFile.isFile()) {
throw new IOException(String.format("Path given: %s is not a file",
- sstFile.getAbsolutePath()));
+ sstFile.getAbsolutePath()));
}
init(sstDumpTool, sstFile, options);
+ this.stackTrace = Thread.currentThread().getStackTrace();
}
private void init(ManagedSSTDumpTool sstDumpTool, File sstFile,
ManagedOptions options)
- throws NativeLibraryNotLoadedException {
- String[] args = {"--file=" + sstFile.getAbsolutePath(),
- "--command=scan"};
+ throws NativeLibraryNotLoadedException {
+ String[] args = {"--file=" + sstFile.getAbsolutePath(), "--command=scan"};
this.sstDumpToolTask = sstDumpTool.run(args, options);
- processOutput = new BufferedReader(new InputStreamReader(
- sstDumpToolTask.getPipedOutput(), StandardCharsets.UTF_8));
+ processOutput = new BufferedReader(
+ new InputStreamReader(sstDumpToolTask.getPipedOutput(),
+ StandardCharsets.UTF_8));
stdoutString = new StringBuilder();
currentMatcher = PATTERN_MATCHER.matcher(stdoutString);
charBuffer = new char[8192];
@@ -97,15 +103,16 @@ public class ManagedSSTDumpIterator implements
if (!this.open.get()) {
throw new RuntimeException("Iterator has been closed");
}
- if (sstDumpToolTask.getFuture().isDone()
- && sstDumpToolTask.exitValue() != 0) {
+ if (sstDumpToolTask.getFuture().isDone() &&
+ sstDumpToolTask.exitValue() != 0) {
throw new RuntimeException("Process Terminated with non zero " +
- String.format("exit value %d", sstDumpToolTask.exitValue()));
+ String.format("exit value %d", sstDumpToolTask.exitValue()));
}
}
/**
* Checks the status of the process & sees if there is another record.
+ *
* @return True if next exists & false otherwise
* Throws Runtime Exception in case of SST File read failure
*/
@@ -116,21 +123,30 @@ public class ManagedSSTDumpIterator implements
return nextKey != null;
}
+ /**
+ * Transforms Key to a certain value.
+ *
+ * @param value
+ * @return transformed Value
+ */
+ protected abstract T getTransformedValue(KeyValue value);
+
/**
* Returns the next record from SSTDumpTool.
+ *
* @return next Key
* Throws Runtime Exception incase of failure.
*/
@Override
- public KeyValue next() {
+ public T next() {
checkSanityOfProcess();
currentKey = nextKey;
nextKey = null;
while (!currentMatcher.find()) {
try {
if (prevMatchEndIndex != 0) {
- stdoutString = new StringBuilder(stdoutString.substring(
- prevMatchEndIndex, stdoutString.length()));
+ stdoutString = new StringBuilder(
+ stdoutString.substring(prevMatchEndIndex,
stdoutString.length()));
prevMatchEndIndex = 0;
currentMatcher = PATTERN_MATCHER.matcher(stdoutString);
}
@@ -138,9 +154,10 @@ public class ManagedSSTDumpIterator implements
if (numberOfCharsRead < 0) {
if (currentKey != null) {
currentKey.setValue(stdoutString.substring(0,
- Math.max(stdoutString.length() - 1, 0)));
+ Math.max(stdoutString.length() - 1, 0)));
+ return getTransformedValue(currentKey);
}
- return currentKey;
+ throw new NoSuchElementException("No more elements found");
}
stdoutString.append(charBuffer, 0, numberOfCharsRead);
currentMatcher.reset();
@@ -150,30 +167,42 @@ public class ManagedSSTDumpIterator implements
}
if (currentKey != null) {
currentKey.setValue(stdoutString.substring(prevMatchEndIndex,
- currentMatcher.start() - 1));
+ currentMatcher.start() - 1));
}
prevMatchEndIndex = currentMatcher.end();
- nextKey = new KeyValue(
- currentMatcher.group(PATTERN_KEY_GROUP_NUMBER),
- currentMatcher.group(PATTERN_SEQ_GROUP_NUMBER),
- currentMatcher.group(PATTERN_TYPE_GROUP_NUMBER));
- return currentKey;
+ nextKey = new KeyValue(currentMatcher.group(PATTERN_KEY_GROUP_NUMBER),
+ currentMatcher.group(PATTERN_SEQ_GROUP_NUMBER),
+ currentMatcher.group(PATTERN_TYPE_GROUP_NUMBER));
+ return getTransformedValue(currentKey);
}
@Override
- public synchronized void close() throws Exception {
+ public synchronized void close() throws UncheckedIOException {
if (this.sstDumpToolTask != null) {
if (!this.sstDumpToolTask.getFuture().isDone()) {
this.sstDumpToolTask.getFuture().cancel(true);
}
- this.processOutput.close();
+ try {
+ this.processOutput.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
}
open.compareAndSet(true, false);
}
@Override
protected void finalize() throws Throwable {
+ if (open.get()) {
+ LOG.warn("{} is not closed properly." +
+ " StackTrace for unclosed instance: {}",
+ this.getClass().getName(),
+ Arrays.stream(stackTrace)
+ .map(StackTraceElement::toString).collect(
+ Collectors.joining("\n")));
+ }
this.close();
+ super.finalize();
}
/**
@@ -214,12 +243,8 @@ public class ManagedSSTDumpIterator implements
@Override
public String toString() {
- return "KeyValue{" +
- "key='" + key + '\'' +
- ", sequence=" + sequence +
- ", type=" + type +
- ", value='" + value + '\'' +
- '}';
+ return "KeyValue{" + "key='" + key + '\'' + ", sequence=" + sequence +
+ ", type=" + type + ", value='" + value + '\'' + '}';
}
}
}
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
index f2a932b40a..3c3764d386 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
@@ -73,6 +73,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.ozone</groupId>
+ <artifactId>hdds-rocks-native</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/ManagedSstFileReader.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/ManagedSstFileReader.java
index cf8e59331d..ce05cd9715 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/ManagedSstFileReader.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/ManagedSstFileReader.java
@@ -18,13 +18,19 @@
package org.apache.ozone.rocksdb.util;
-import org.rocksdb.Options;
+import org.apache.hadoop.util.ClosableIterator;
+import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpIterator;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpTool;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.SstFileReader;
import org.rocksdb.SstFileReaderIterator;
-import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
@@ -46,42 +52,153 @@ public class ManagedSstFileReader {
public ManagedSstFileReader(final Collection<String> sstFiles) {
this.sstFiles = sstFiles;
}
- public Stream<String> getKeyStream() throws RocksDBException {
- final ManagedSstFileIterator itr = new ManagedSstFileIterator(sstFiles);
- final Spliterator<String> spliterator = Spliterators
- .spliteratorUnknownSize(itr, 0);
- return StreamSupport.stream(spliterator, false).onClose(itr::close);
+
+ public static <T> Stream<T> getStreamFromIterator(ClosableIterator<T> itr) {
+ final Spliterator<T> spliterator =
+ Spliterators.spliteratorUnknownSize(itr, 0);
+ return StreamSupport.stream(spliterator, false).onClose(() -> {
+ itr.close();
+ });
+ }
+
+ public Stream<String> getKeyStream() throws RocksDBException,
+ NativeLibraryNotLoadedException, IOException {
+ // TODO: [SNAPSHOT] Check if default Options and ReadOptions is enough.
+ final MultipleSstFileIterator<String> itr =
+ new MultipleSstFileIterator<String>(sstFiles) {
+ private ManagedOptions options;
+ private ReadOptions readOptions;
+
+ @Override
+ protected void init() {
+ this.options = new ManagedOptions();
+ this.readOptions = new ManagedReadOptions();
+ }
+
+ @Override
+ protected ClosableIterator<String> getKeyIteratorForFile(String file)
+ throws RocksDBException {
+ return new ManagedSstFileIterator(file, options, readOptions) {
+ @Override
+ protected String getIteratorValue(
+ SstFileReaderIterator iterator) {
+ return new String(iterator.key(), UTF_8);
+ }
+ };
+ }
+
+ @Override
+ public void close() throws UncheckedIOException {
+ super.close();
+ options.close();
+ readOptions.close();
+ }
+ };
+ return getStreamFromIterator(itr);
+ }
+
+ public Stream<String> getKeyStreamWithTombstone(
+ ManagedSSTDumpTool sstDumpTool) throws IOException, RocksDBException,
+ NativeLibraryNotLoadedException {
+ final MultipleSstFileIterator<String> itr =
+ new MultipleSstFileIterator<String>(sstFiles) {
+ //TODO: [SNAPSHOT] Check if default Options is enough.
+ private ManagedOptions options;
+
+ @Override
+ protected void init() {
+ this.options = new ManagedOptions();
+ }
+
+ @Override
+ protected ClosableIterator<String> getKeyIteratorForFile(String file)
+ throws NativeLibraryNotLoadedException, IOException {
+ return new ManagedSSTDumpIterator<String>(sstDumpTool, file,
+ options) {
+ @Override
+ protected String getTransformedValue(KeyValue value) {
+ return value.getKey();
+ }
+ };
+ }
+
+ @Override
+ public void close() throws UncheckedIOException {
+ super.close();
+ options.close();
+ }
+ };
+ return getStreamFromIterator(itr);
+ }
+
+ private abstract static class ManagedSstFileIterator implements
+ ClosableIterator<String> {
+ private SstFileReader fileReader;
+ private SstFileReaderIterator fileReaderIterator;
+
+ ManagedSstFileIterator(String path, ManagedOptions options,
+ ReadOptions readOptions)
+ throws RocksDBException {
+ this.fileReader = new SstFileReader(options);
+ this.fileReader.open(path);
+ this.fileReaderIterator = fileReader.newIterator(readOptions);
+ fileReaderIterator.seekToFirst();
+ }
+
+ @Override
+ public void close() {
+ this.fileReaderIterator.close();
+ this.fileReader.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return fileReaderIterator.isValid();
+ }
+
+ protected abstract String getIteratorValue(SstFileReaderIterator iterator);
+
+ @Override
+ public String next() {
+ String value = getIteratorValue(fileReaderIterator);
+ fileReaderIterator.next();
+ return value;
+ }
}
- private static final class ManagedSstFileIterator implements
- Iterator<String>, Closeable {
+ private abstract static class MultipleSstFileIterator<T> implements
+ ClosableIterator<T> {
private final Iterator<String> fileNameIterator;
- private final Options options;
- private final ReadOptions readOptions;
+
private String currentFile;
- private SstFileReader currentFileReader;
- private SstFileReaderIterator currentFileIterator;
+ private ClosableIterator<T> currentFileIterator;
- private ManagedSstFileIterator(Collection<String> files)
- throws RocksDBException {
- // TODO: Check if default Options and ReadOptions is enough.
- this.options = new Options();
- this.readOptions = new ReadOptions();
+ private MultipleSstFileIterator(Collection<String> files)
+ throws IOException, RocksDBException,
+ NativeLibraryNotLoadedException {
this.fileNameIterator = files.iterator();
+ init();
moveToNextFile();
}
+ protected abstract void init();
+
+ protected abstract ClosableIterator<T> getKeyIteratorForFile(String file)
+ throws RocksDBException, NativeLibraryNotLoadedException,
+ IOException;
+
@Override
public boolean hasNext() {
try {
do {
- if (currentFileIterator.isValid()) {
+ if (currentFileIterator.hasNext()) {
return true;
}
} while (moveToNextFile());
- } catch (RocksDBException e) {
- // TODO: This exception has to be handled by the caller.
+ } catch (IOException | RocksDBException |
+ NativeLibraryNotLoadedException e) {
+ // TODO: [Snapshot] This exception has to be handled by the caller.
// We have to do better exception handling.
throw new RuntimeException(e);
}
@@ -89,37 +206,36 @@ public class ManagedSstFileReader {
}
@Override
- public String next() {
+ public T next() {
if (hasNext()) {
- final String value = new String(currentFileIterator.key(), UTF_8);
- currentFileIterator.next();
- return value;
+ return currentFileIterator.next();
}
- throw new NoSuchElementException("No more keys");
+ throw new NoSuchElementException("No more elements found.");
}
@Override
- public void close() {
- closeCurrentFile();
+ public void close() throws UncheckedIOException {
+ try {
+ closeCurrentFile();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
}
- private boolean moveToNextFile() throws RocksDBException {
+ private boolean moveToNextFile() throws IOException, RocksDBException,
+ NativeLibraryNotLoadedException {
if (fileNameIterator.hasNext()) {
closeCurrentFile();
currentFile = fileNameIterator.next();
- currentFileReader = new SstFileReader(options);
- currentFileReader.open(currentFile);
- currentFileIterator = currentFileReader.newIterator(readOptions);
- currentFileIterator.seekToFirst();
+ this.currentFileIterator = getKeyIteratorForFile(currentFile);
return true;
}
return false;
}
- private void closeCurrentFile() {
+ private void closeCurrentFile() throws IOException {
if (currentFile != null) {
currentFileIterator.close();
- currentFileReader.close();
currentFile = null;
}
}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 937835fdb7..34e17519e0 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -28,6 +28,17 @@ import org.apache.ratis.util.TimeDuration;
* Ozone Manager Constants.
*/
public final class OMConfigKeys {
+ public static final String
+ OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE =
+ "ozone.om.snapshot.sst_dumptool.pool.size";
+ public static final int
+ OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE_DEFAULT = 1;
+ public static final String
+ OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE =
+ "ozone.om.snapshot.sst_dumptool.buffer.size";
+ public static final String
+ OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE_DEFAULT = "8KB";
+
/**
* Never constructed.
*/
diff --git a/hadoop-ozone/ozone-manager/pom.xml
b/hadoop-ozone/ozone-manager/pom.xml
index 974565206d..a2b8b284d0 100644
--- a/hadoop-ozone/ozone-manager/pom.xml
+++ b/hadoop-ozone/ozone-manager/pom.xml
@@ -254,6 +254,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.ozone</groupId>
+ <artifactId>hdds-rocks-native</artifactId>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
index 60e7ea7fd0..09a828e33d 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
@@ -30,6 +30,14 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.utils.NativeConstants;
+import org.apache.hadoop.hdds.utils.NativeLibraryLoader;
+import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -41,9 +49,11 @@ import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.utils.db.CodecRegistry;
import org.apache.hadoop.hdds.utils.db.IntegerCodec;
import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpTool;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshot;
@@ -73,6 +83,8 @@ import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.SynchronousQueue;
+
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getSnapshotInfo;
import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.DONE;
@@ -124,6 +136,9 @@ public class SnapshotDiffManager implements AutoCloseable {
private final PersistentMap<String, SnapshotDiffJob> snapDiffJobTable;
private final ExecutorService executorService;
+ private boolean isNativeRocksToolsLoaded;
+
+ private ManagedSSTDumpTool sstDumpTool;
public SnapshotDiffManager(ManagedRocksDB db,
RocksDBCheckpointDiffer differ,
@@ -170,6 +185,36 @@ public class SnapshotDiffManager implements AutoCloseable {
// TODO: [SNAPSHOT] Load jobs only if it is leader node.
// It could a event-triggered form OM when node is leader and up.
this.loadJobsOnStartUp();
+ isNativeRocksToolsLoaded = NativeLibraryLoader.getInstance()
+ .loadLibrary(NativeConstants.ROCKS_TOOLS_NATIVE_LIBRARY_NAME);
+ if (isNativeRocksToolsLoaded) {
+ isNativeRocksToolsLoaded = initSSTDumpTool(
+ ozoneManager.getConfiguration());
+ }
+ }
+
+ private boolean initSSTDumpTool(OzoneConfiguration conf) {
+ try {
+ int threadPoolSize = conf.getInt(
+ OMConfigKeys.OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE,
+ OMConfigKeys
+ .OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE_DEFAULT);
+ int bufferSize = (int) conf.getStorageSize(
+ OMConfigKeys.OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE,
+ OMConfigKeys
+ .OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE_DEFAULT,
+ StorageUnit.BYTES);
+ ExecutorService execService = new ThreadPoolExecutor(0,
+ threadPoolSize, 60, TimeUnit.SECONDS,
+ new SynchronousQueue<>(), new ThreadFactoryBuilder()
+ .setNameFormat("snapshot-diff-manager-sst-dump-tool-TID-%d")
+ .build(),
+ new ThreadPoolExecutor.DiscardPolicy());
+ sstDumpTool = new ManagedSSTDumpTool(execService, bufferSize);
+ } catch (NativeLibraryNotLoadedException e) {
+ return false;
+ }
+ return true;
}
private Map<String, String> getTablePrefixes(
@@ -209,6 +254,13 @@ public class SnapshotDiffManager implements AutoCloseable {
getTablePrefixes(snapshotOMMM, volumeName, bucketName));
}
+ private Set<String> getSSTFileListForSnapshot(OmSnapshot snapshot,
+ List<String> tablesToLookUp) throws RocksDBException {
+ return RdbUtil.getSSTFilesForComparison(snapshot
+ .getMetadataManager().getStore().getDbLocation()
+ .getPath(), tablesToLookUp);
+ }
+
@SuppressWarnings("parameternumber")
public SnapshotDiffResponse getSnapshotDiffReport(
final String volume,
@@ -408,7 +460,6 @@ public class SnapshotDiffManager implements AutoCloseable {
ColumnFamilyHandle fromSnapshotColumnFamily = null;
ColumnFamilyHandle toSnapshotColumnFamily = null;
ColumnFamilyHandle objectIDsColumnFamily = null;
-
try {
// JobId is prepended to column families name to make them unique
// for request.
@@ -428,30 +479,19 @@ public class SnapshotDiffManager implements AutoCloseable
{
// Note: Store objectId and keyName as byte array to reduce unnecessary
// serialization and deserialization.
final PersistentMap<byte[], byte[]> objectIdToKeyNameMapForFromSnapshot =
- new RocksDbPersistentMap<>(db,
- fromSnapshotColumnFamily,
- codecRegistry,
- byte[].class,
- byte[].class);
-
+ new RocksDbPersistentMap<>(db, fromSnapshotColumnFamily,
+ codecRegistry, byte[].class, byte[].class);
// ObjectId to keyName map to keep key info for toSnapshot.
final PersistentMap<byte[], byte[]> objectIdToKeyNameMapForToSnapshot =
- new RocksDbPersistentMap<>(db,
- toSnapshotColumnFamily,
- codecRegistry,
- byte[].class,
- byte[].class);
-
+ new RocksDbPersistentMap<>(db, toSnapshotColumnFamily, codecRegistry,
+ byte[].class, byte[].class);
// Set of unique objectId between fromSnapshot and toSnapshot.
final PersistentSet<byte[]> objectIDsToCheckMap =
- new RocksDbPersistentSet<>(db,
- objectIDsColumnFamily,
- codecRegistry,
+ new RocksDbPersistentSet<>(db, objectIDsColumnFamily, codecRegistry,
byte[].class);
final BucketLayout bucketLayout = getBucketLayout(volume, bucket,
fromSnapshot.getMetadataManager());
-
final Table<String, OmKeyInfo> fsKeyTable =
fromSnapshot.getMetadataManager().getKeyTable(bucketLayout);
final Table<String, OmKeyInfo> tsKeyTable =
@@ -466,40 +506,82 @@ public class SnapshotDiffManager implements AutoCloseable
{
Map<String, String> tablePrefixes =
getTablePrefixes(toSnapshot.getMetadataManager(), volume, bucket);
-
- final Set<String> deltaFilesForKeyOrFileTable =
- getDeltaFiles(fromSnapshot, toSnapshot,
- Collections.singletonList(fsKeyTable.getName()), fsInfo, tsInfo,
+ List<String> tablesToLookUp =
+ Collections.singletonList(fsKeyTable.getName());
+ final Set<String> deltaFilesForKeyOrFileTable = getDeltaFiles(
+ fromSnapshot, toSnapshot, tablesToLookUp, fsInfo, tsInfo,
useFullDiff, tablePrefixes);
- addToObjectIdMap(fsKeyTable,
- tsKeyTable,
- deltaFilesForKeyOrFileTable,
- objectIdToKeyNameMapForFromSnapshot,
- objectIdToKeyNameMapForToSnapshot,
- objectIDsToCheckMap,
- tablePrefixes);
+ // Workaround to handle deletes if native rockstools for reading
+ // tombstone is not loaded.
+ // TODO: [SNAPSHOT] Update Rocksdb SSTFileIterator to read tombstone
+ if (!isNativeRocksToolsLoaded) {
+ deltaFilesForKeyOrFileTable.addAll(getSSTFileListForSnapshot(
+ fromSnapshot, tablesToLookUp));
+ }
+ try {
+ addToObjectIdMap(fsKeyTable, tsKeyTable,
+ Pair.of(isNativeRocksToolsLoaded, deltaFilesForKeyOrFileTable),
+ objectIdToKeyNameMapForFromSnapshot,
+ objectIdToKeyNameMapForToSnapshot, objectIDsToCheckMap,
+ tablePrefixes);
+ } catch (NativeLibraryNotLoadedException e) {
+ // Workaround to handle deletes if use of native rockstools for reading
+ // tombstone fails.
+ // TODO: [SNAPSHOT] Update Rocksdb SSTFileIterator to read tombstone
+ deltaFilesForKeyOrFileTable.addAll(getSSTFileListForSnapshot(
+ fromSnapshot, tablesToLookUp));
+ try {
+ addToObjectIdMap(fsKeyTable, tsKeyTable,
+ Pair.of(false, deltaFilesForKeyOrFileTable),
+ objectIdToKeyNameMapForFromSnapshot,
+ objectIdToKeyNameMapForToSnapshot, objectIDsToCheckMap,
+ tablePrefixes);
+ } catch (NativeLibraryNotLoadedException ex) {
+ // This code should be never executed.
+ throw new IllegalStateException(ex);
+ }
+ }
if (bucketLayout.isFileSystemOptimized()) {
final Table<String, OmDirectoryInfo> fsDirTable =
fromSnapshot.getMetadataManager().getDirectoryTable();
final Table<String, OmDirectoryInfo> tsDirTable =
toSnapshot.getMetadataManager().getDirectoryTable();
+ tablesToLookUp = Collections.singletonList(fsDirTable.getName());
final Set<String> deltaFilesForDirTable =
- getDeltaFiles(fromSnapshot, toSnapshot,
- Collections.singletonList(fsDirTable.getName()), fsInfo,
tsInfo,
- useFullDiff, tablePrefixes);
- addToObjectIdMap(fsDirTable,
- tsDirTable,
- deltaFilesForDirTable,
- objectIdToKeyNameMapForFromSnapshot,
- objectIdToKeyNameMapForToSnapshot,
- objectIDsToCheckMap,
- tablePrefixes);
+ getDeltaFiles(fromSnapshot, toSnapshot, tablesToLookUp, fsInfo,
+ tsInfo, useFullDiff, tablePrefixes);
+ if (!isNativeRocksToolsLoaded) {
+ deltaFilesForDirTable.addAll(getSSTFileListForSnapshot(
+ fromSnapshot, tablesToLookUp));
+ }
+ try {
+ addToObjectIdMap(fsDirTable, tsDirTable,
+ Pair.of(isNativeRocksToolsLoaded, deltaFilesForDirTable),
+ objectIdToKeyNameMapForFromSnapshot,
+ objectIdToKeyNameMapForToSnapshot,
+ objectIDsToCheckMap,
+ tablePrefixes);
+ } catch (NativeLibraryNotLoadedException e) {
+ try {
+ // Workaround to handle deletes if use of native rockstools for
+ // reading tombstone fails.
+ // TODO: [SNAPSHOT] Update Rocksdb SSTFileIterator to read
tombstone
+ deltaFilesForDirTable.addAll(getSSTFileListForSnapshot(
+ fromSnapshot, tablesToLookUp));
+ addToObjectIdMap(fsDirTable, tsDirTable,
+ Pair.of(false, deltaFilesForDirTable),
+ objectIdToKeyNameMapForFromSnapshot,
+ objectIdToKeyNameMapForToSnapshot, objectIDsToCheckMap,
+ tablePrefixes);
+ } catch (NativeLibraryNotLoadedException ex) {
+ // This code should be never executed.
+ throw new IllegalStateException(ex);
+ }
+ }
}
-
- generateDiffReport(jobId,
- objectIDsToCheckMap,
+ generateDiffReport(jobId, objectIDsToCheckMap,
objectIdToKeyNameMapForFromSnapshot,
objectIdToKeyNameMapForToSnapshot);
updateJobStatus(jobKey, IN_PROGRESS, DONE);
@@ -517,22 +599,26 @@ public class SnapshotDiffManager implements AutoCloseable
{
private void addToObjectIdMap(Table<String, ? extends WithObjectID> fsTable,
Table<String, ? extends WithObjectID> tsTable,
- Set<String> deltaFiles,
+ Pair<Boolean, Set<String>>
+ isNativeRocksToolsLoadedDeltaFilesPair,
PersistentMap<byte[], byte[]> oldObjIdToKeyMap,
PersistentMap<byte[], byte[]> newObjIdToKeyMap,
PersistentSet<byte[]> objectIDsToCheck,
Map<String, String> tablePrefixes)
- throws IOException {
+ throws IOException, NativeLibraryNotLoadedException {
+ Set<String> deltaFiles = isNativeRocksToolsLoadedDeltaFilesPair.getRight();
if (deltaFiles.isEmpty()) {
return;
}
-
+ boolean nativeRocksToolsLoaded =
+ isNativeRocksToolsLoadedDeltaFilesPair.getLeft();
boolean isDirectoryTable =
fsTable.getName().equals(OmMetadataManagerImpl.DIRECTORY_TABLE);
-
- try (Stream<String> keysToCheck = new ManagedSstFileReader(deltaFiles)
- .getKeyStream()) {
+ ManagedSstFileReader sstFileReader = new ManagedSstFileReader(deltaFiles);
+ try (Stream<String> keysToCheck = nativeRocksToolsLoaded
+ ? sstFileReader.getKeyStreamWithTombstone(sstDumpTool)
+ : sstFileReader.getKeyStream()) {
keysToCheck.forEach(key -> {
try {
final WithObjectID oldKey = fsTable.get(key);
@@ -602,19 +688,6 @@ public class SnapshotDiffManager implements AutoCloseable {
List<String> sstDiffList =
differ.getSSTDiffListWithFullPath(toDSI, fromDSI);
deltaFiles.addAll(sstDiffList);
-
- // TODO: [SNAPSHOT] Remove the workaround below when the SnapDiff logic
- // can read tombstones in SST files.
- // Workaround: Append "From DB" SST files to the deltaFiles list so that
- // the current SnapDiff logic correctly handles deleted keys.
- if (!deltaFiles.isEmpty()) {
- Set<String> fromSnapshotFiles = RdbUtil.getSSTFilesForComparison(
- fromSnapshot.getMetadataManager()
- .getStore().getDbLocation().getPath(),
- tablesToLookUp);
- deltaFiles.addAll(fromSnapshotFiles);
- }
- // End of Workaround
}
if (useFullDiff || deltaFiles.isEmpty()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]