This is an automated email from the ASF dual-hosted git repository.
prashantpogde pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new beec178285 HDDS-8306. [Snapshot] Fix SSTDumptool to allow null
character & add tests for SST dump tool (#4584)
beec178285 is described below
commit beec17828524cb53ccf496e23f9e0cb93b3cf7a0
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Thu May 18 11:14:57 2023 -0700
HDDS-8306. [Snapshot] Fix SSTDumptool to allow null character & add tests
for SST dump tool (#4584)
* HDDS-8306. [Snapshot] Add tests for SST dump tool
* HDDS-8306. Address review comments
* HDDS-8306. Fix Failures
* HDDS-8306. Fix checkstyle
* HDDS-8306. Address review comments
* HDDS-8306. Address review comments
* HDDS-8306. Address review comments
* HDDS-8306. Address review comments
* HDDS-8306. Address review comments
* HDDS-8306. Address review comments
* HDDS-8306. Fix checkstyle
* HDDS-8306. Address review comments
* HDDS-8306. Address review comments
---
hadoop-hdds/rocks-native/pom.xml | 53 ++++-
.../hadoop/hdds/utils/NativeLibraryLoader.java | 16 +-
.../utils/db/managed/ManagedSSTDumpIterator.java | 189 ++++++++--------
.../hdds/utils/db/managed/ManagedSSTDumpTool.java | 7 +-
.../hdds/utils/db/managed/PipeInputStream.java | 6 +-
.../rocks-native/src/main/native/SSTDumpTool.cpp | 4 +-
.../src/main/patches/rocks-native.patch | 229 ++++++++++---------
.../hadoop/hdds/utils/db/managed/Native.java | 40 ++++
.../db/managed/TestManagedSSTDumpIterator.java | 242 +++++++++++++++++++++
.../ozone/rocksdb/util/ManagedSstFileReader.java | 19 +-
10 files changed, 584 insertions(+), 221 deletions(-)
diff --git a/hadoop-hdds/rocks-native/pom.xml b/hadoop-hdds/rocks-native/pom.xml
index 9553426809..aad40b2738 100644
--- a/hadoop-hdds/rocks-native/pom.xml
+++ b/hadoop-hdds/rocks-native/pom.xml
@@ -33,6 +33,26 @@
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-io</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.github.spotbugs</groupId>
+ <artifactId>spotbugs-annotations</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<properties>
<maven.compiler.source>8</maven.compiler.source>
@@ -265,7 +285,7 @@
</execution>
<execution>
<id>build-rocksjava</id>
- <phase>process-resources</phase>
+ <phase>generate-resources</phase>
<configuration>
<tasks>
<exec executable="chmod"
failonerror="true">
@@ -317,7 +337,7 @@
</execution>
<execution>
<id>copy-lib-file</id>
- <phase>prepare-package</phase>
+ <phase>process-classes</phase>
<configuration>
<tasks>
<copy
toDir="${project.build.outputDirectory}">
@@ -345,6 +365,14 @@
</includes>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <groups>native</groups>
+ <argLine>${maven-surefire-plugin.argLine}
@{argLine}
-Djava.library.path=${project.build.directory}/native/rocksdb</argLine>
+ </configuration>
+ </plugin>
</plugins>
</build>
</profile>
@@ -397,7 +425,7 @@
<executions>
<execution>
<id>copy-jars</id>
- <phase>process-resources</phase>
+ <phase>process-sources</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
@@ -436,5 +464,24 @@
</plugins>
</build>
</profile>
+ <profile>
+ <id>disable-native-testing</id>
+ <activation>
+ <property>
+ <name>!rocks_tools_native</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludedGroups>native</excludedGroups>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
</project>
diff --git
a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/NativeLibraryLoader.java
b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/NativeLibraryLoader.java
index 35427c822f..da0252a090 100644
---
a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/NativeLibraryLoader.java
+++
b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/NativeLibraryLoader.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.utils;
+import org.apache.hadoop.ozone.util.ShutdownHookManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +37,8 @@ import java.util.concurrent.ConcurrentHashMap;
public class NativeLibraryLoader {
private static final Logger LOG =
- LoggerFactory.getLogger(NativeLibraryLoader.class);
+ LoggerFactory.getLogger(NativeLibraryLoader.class);
+ public static final int LIBRARY_SHUTDOWN_HOOK_PRIORITY = 1;
private static final String OS = System.getProperty("os.name").toLowerCase();
private Map<String, Boolean> librariesLoaded;
private static volatile NativeLibraryLoader instance;
@@ -91,13 +93,14 @@ public class NativeLibraryLoader {
public static boolean isLibraryLoaded(final String libraryName) {
return getInstance().librariesLoaded
- .getOrDefault(libraryName, false);
+ .getOrDefault(libraryName, false);
}
public synchronized boolean loadLibrary(final String libraryName) {
if (isLibraryLoaded(libraryName)) {
return true;
}
+ LOG.info("Loading Library: {}", libraryName);
boolean loaded = false;
try {
loaded = false;
@@ -122,7 +125,7 @@ public class NativeLibraryLoader {
}
private Optional<File> copyResourceFromJarToTemp(final String libraryName)
- throws IOException {
+ throws IOException {
final String libraryFileName = getJniLibraryFileName(libraryName);
InputStream is = null;
try {
@@ -132,7 +135,8 @@ public class NativeLibraryLoader {
}
// create a temporary file to copy the library to
- final File temp = File.createTempFile(libraryName, getLibOsSuffix());
+ final File temp = File.createTempFile(libraryName, getLibOsSuffix(),
+ new File(""));
if (!temp.exists()) {
return Optional.empty();
} else {
@@ -140,7 +144,9 @@ public class NativeLibraryLoader {
}
Files.copy(is, temp.toPath(), StandardCopyOption.REPLACE_EXISTING);
- return Optional.ofNullable(temp);
+ ShutdownHookManager.get().addShutdownHook(temp::delete,
+ LIBRARY_SHUTDOWN_HOOK_PRIORITY);
+ return Optional.of(temp);
} finally {
if (is != null) {
is.close();
diff --git
a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java
b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java
index af54208cb0..4fa578c4db 100644
---
a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java
+++
b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java
@@ -17,23 +17,23 @@
package org.apache.hadoop.hdds.utils.db.managed;
+import com.google.common.primitives.UnsignedLong;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.util.ClosableIterator;
import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
-import java.io.InputStreamReader;
+import java.io.InputStream;
import java.io.UncheckedIOException;
-import java.nio.charset.StandardCharsets;
+import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
@@ -43,21 +43,16 @@ public abstract class ManagedSSTDumpIterator<T> implements
ClosableIterator<T> {
private static final Logger LOG =
LoggerFactory.getLogger(ManagedSSTDumpIterator.class);
- // Since we don't have any restriction on the key, we are prepending
+ // Since we don't have any restriction on the key & value, we are prepending
// the length of the pattern in the sst dump tool output.
// The first token in the pattern is the key.
// The second tells the sequence number of the key.
// The third token gives the type of key in the sst file.
- private static final String PATTERN_REGEX =
- "'([\\s\\S]+)' seq:([0-9]+), type:([0-9]+)";
- public static final int PATTERN_KEY_GROUP_NUMBER = 1;
- public static final int PATTERN_SEQ_GROUP_NUMBER = 2;
- public static final int PATTERN_TYPE_GROUP_NUMBER = 3;
- private static final Pattern PATTERN_MATCHER =
Pattern.compile(PATTERN_REGEX);
- private BufferedReader processOutput;
- private KeyValue currentKey;
- private char[] charBuffer;
- private KeyValue nextKey;
+ // The fourth token
+ private InputStream processOutput;
+ private Optional<KeyValue> currentKey;
+ private byte[] intBuffer;
+ private Optional<KeyValue> nextKey;
private ManagedSSTDumpTool.SSTDumpToolTask sstDumpToolTask;
private AtomicBoolean open;
@@ -86,51 +81,61 @@ public abstract class ManagedSSTDumpIterator<T> implements
ClosableIterator<T> {
* @return Optional of the integer empty if no integer exists
*/
private Optional<Integer> getNextNumberInStream() throws IOException {
- StringBuilder value = new StringBuilder();
- int val;
- while ((val = processOutput.read()) != -1) {
- if (val >= '0' && val <= '9') {
- value.append((char) val);
- } else if (value.length() > 0) {
- break;
- }
+ int n = processOutput.read(intBuffer, 0, 4);
+ if (n == 4) {
+ return Optional.of(ByteBuffer.wrap(intBuffer).getInt());
+ } else if (n >= 0) {
+ throw new IllegalStateException(String.format("Integer expects " +
+ "4 bytes to be read from the stream, but read only %d bytes", n));
}
- return value.length() > 0 ? Optional.of(Integer.valueOf(value.toString()))
- : Optional.empty();
+ return Optional.empty();
}
- /**
- * Reads the next n chars from the stream & makes a string.
- *
- * @param numberOfChars
- * @return String of next chars read
- * @throws IOException
- */
- private String readNextNumberOfCharsFromStream(int numberOfChars)
- throws IOException {
- StringBuilder value = new StringBuilder();
- while (numberOfChars > 0) {
- int noOfCharsRead = processOutput.read(charBuffer, 0,
- Math.min(numberOfChars, charBuffer.length));
- if (noOfCharsRead == -1) {
- break;
+ private Optional<byte[]> getNextByteArray() throws IOException {
+ Optional<Integer> size = getNextNumberInStream();
+ if (size.isPresent()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Allocating byte array, size: {}", size.get());
}
- value.append(charBuffer, 0, noOfCharsRead);
- numberOfChars -= noOfCharsRead;
+ byte[] b = new byte[size.get()];
+ int n = processOutput.read(b);
+ if (n >= 0 && n != size.get()) {
+ throw new IllegalStateException(String.format("Integer expects " +
+ "4 bytes to be read from the stream, but read only %d bytes", n));
+ }
+ return Optional.of(b);
}
+ return Optional.empty();
+ }
- return value.toString();
+ private Optional<UnsignedLong> getNextUnsignedLong() throws IOException {
+ long val = 0;
+ for (int i = 0; i < 8; i++) {
+ val = val << 8;
+ int nextByte = processOutput.read();
+ if (nextByte < 0) {
+ if (i == 0) {
+ return Optional.empty();
+ }
+ throw new IllegalStateException(String.format("Long expects " +
+ "8 bytes to be read from the stream, but read only %d bytes", i));
+ }
+ val += nextByte;
+ }
+ return Optional.of(UnsignedLong.fromLongBits(val));
}
private void init(ManagedSSTDumpTool sstDumpTool, File sstFile,
ManagedOptions options)
throws NativeLibraryNotLoadedException {
- String[] args = {"--file=" + sstFile.getAbsolutePath(), "--command=scan"};
+ String[] args = {"--file=" + sstFile.getAbsolutePath(), "--command=scan",
+ "--silent"};
this.sstDumpToolTask = sstDumpTool.run(args, options);
- processOutput = new BufferedReader(new InputStreamReader(
- sstDumpToolTask.getPipedOutput(), StandardCharsets.UTF_8));
- charBuffer = new char[8192];
+ processOutput = sstDumpToolTask.getPipedOutput();
+ intBuffer = new byte[4];
open = new AtomicBoolean(true);
+ currentKey = Optional.empty();
+ nextKey = Optional.empty();
next();
}
@@ -159,7 +164,7 @@ public abstract class ManagedSSTDumpIterator<T> implements
ClosableIterator<T> {
@Override
public boolean hasNext() {
checkSanityOfProcess();
- return nextKey != null;
+ return nextKey.isPresent();
}
/**
@@ -168,7 +173,7 @@ public abstract class ManagedSSTDumpIterator<T> implements
ClosableIterator<T> {
* @param value
* @return transformed Value
*/
- protected abstract T getTransformedValue(KeyValue value);
+ protected abstract T getTransformedValue(Optional<KeyValue> value);
/**
* Returns the next record from SSTDumpTool.
@@ -180,33 +185,33 @@ public abstract class ManagedSSTDumpIterator<T>
implements ClosableIterator<T> {
public T next() {
checkSanityOfProcess();
currentKey = nextKey;
- nextKey = null;
- boolean keyFound = false;
- while (!keyFound) {
- try {
- Optional<Integer> keyLength = getNextNumberInStream();
- if (!keyLength.isPresent()) {
- return getTransformedValue(currentKey);
- }
- String keyStr = readNextNumberOfCharsFromStream(keyLength.get());
- Matcher matcher = PATTERN_MATCHER.matcher(keyStr);
- if (keyStr.length() == keyLength.get() && matcher.find()) {
- Optional<Integer> valueLength = getNextNumberInStream();
- if (valueLength.isPresent()) {
- String valueStr = readNextNumberOfCharsFromStream(
- valueLength.get());
- if (valueStr.length() == valueLength.get()) {
- keyFound = true;
- nextKey = new KeyValue(matcher.group(PATTERN_KEY_GROUP_NUMBER),
- matcher.group(PATTERN_SEQ_GROUP_NUMBER),
- matcher.group(PATTERN_TYPE_GROUP_NUMBER),
- valueStr);
- }
- }
- }
- } catch (IOException e) {
- throw new RuntimeIOException(e);
+ nextKey = Optional.empty();
+ try {
+ Optional<byte[]> key = getNextByteArray();
+ if (!key.isPresent()) {
+ return getTransformedValue(currentKey);
}
+ UnsignedLong sequenceNumber = getNextUnsignedLong()
+ .orElseThrow(() -> new IllegalStateException(
+ String.format("Error while trying to read sequence number" +
+ " for key %s", StringUtils.bytes2String(key.get()))));
+
+ Integer type = getNextNumberInStream()
+ .orElseThrow(() -> new IllegalStateException(
+ String.format("Error while trying to read sequence number for " +
+ "key %s with sequence number %s",
+ StringUtils.bytes2String(key.get()),
+ sequenceNumber.toString())));
+ byte[] val = getNextByteArray().orElseThrow(() ->
+ new IllegalStateException(
+ String.format("Error while trying to read sequence number for " +
+ "key %s with sequence number %s of type %d",
+ StringUtils.bytes2String(key.get()),
+ sequenceNumber.toString(), type)));
+ nextKey = Optional.of(new KeyValue(key.get(), sequenceNumber, type,
val));
+ } catch (IOException e) {
+ // TODO [SNAPSHOT] Throw custom snapshot exception
+ throw new RuntimeIOException(e);
}
return getTransformedValue(currentKey);
}
@@ -244,25 +249,26 @@ public abstract class ManagedSSTDumpIterator<T>
implements ClosableIterator<T> {
* Class containing Parsed KeyValue Record from Sst Dumptool output.
*/
public static final class KeyValue {
- private String key;
- private Integer sequence;
- private Integer type;
- private String value;
+ private final byte[] key;
+ private final UnsignedLong sequence;
+ private final Integer type;
+ private final byte[] value;
- private KeyValue(String key, String sequence, String type,
- String value) {
+ private KeyValue(byte[] key, UnsignedLong sequence, Integer type,
+ byte[] value) {
this.key = key;
- this.sequence = Integer.valueOf(sequence);
- this.type = Integer.valueOf(type);
+ this.sequence = sequence;
+ this.type = type;
this.value = value;
}
- public String getKey() {
+ @SuppressFBWarnings("EI_EXPOSE_REP")
+ public byte[] getKey() {
return key;
}
- public Integer getSequence() {
+ public UnsignedLong getSequence() {
return sequence;
}
@@ -270,14 +276,19 @@ public abstract class ManagedSSTDumpIterator<T>
implements ClosableIterator<T> {
return type;
}
- public String getValue() {
+ @SuppressFBWarnings("EI_EXPOSE_REP")
+ public byte[] getValue() {
return value;
}
@Override
public String toString() {
- return "KeyValue{" + "key='" + key + '\'' + ", sequence=" + sequence +
- ", type=" + type + ", value='" + value + '\'' + '}';
+ return "KeyValue{" +
+ "key=" + StringUtils.bytes2String(key) +
+ ", sequence=" + sequence +
+ ", type=" + type +
+ ", value=" + StringUtils.bytes2String(value) +
+ '}';
}
}
}
diff --git
a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpTool.java
b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpTool.java
index 649378fad6..0c601b7c1d 100644
---
a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpTool.java
+++
b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpTool.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.utils.db.managed;
import org.apache.hadoop.hdds.utils.NativeLibraryLoader;
import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException;
+import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -75,9 +76,9 @@ public class ManagedSSTDumpTool {
*/
static class SSTDumpToolTask {
private Future<Integer> future;
- private PipeInputStream pipedOutput;
+ private InputStream pipedOutput;
- SSTDumpToolTask(Future<Integer> future, PipeInputStream pipedOutput) {
+ SSTDumpToolTask(Future<Integer> future, InputStream pipedOutput) {
this.future = future;
this.pipedOutput = pipedOutput;
}
@@ -86,7 +87,7 @@ public class ManagedSSTDumpTool {
return future;
}
- public PipeInputStream getPipedOutput() {
+ public InputStream getPipedOutput() {
return pipedOutput;
}
diff --git
a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/PipeInputStream.java
b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/PipeInputStream.java
index 741761ae49..54d653b6bf 100644
---
a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/PipeInputStream.java
+++
b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/PipeInputStream.java
@@ -65,11 +65,13 @@ public class PipeInputStream extends InputStream {
this.close();
return -1;
}
- if (numberOfBytesLeftToRead == 0) {
+ while (numberOfBytesLeftToRead == 0) {
numberOfBytesLeftToRead = readInternal(byteBuffer, capacity,
nativeHandle);
index = 0;
- return read();
+ if (numberOfBytesLeftToRead != 0) {
+ return read();
+ }
}
numberOfBytesLeftToRead--;
int ret = byteBuffer[index] & 0xFF;
diff --git a/hadoop-hdds/rocks-native/src/main/native/SSTDumpTool.cpp
b/hadoop-hdds/rocks-native/src/main/native/SSTDumpTool.cpp
index b200f49c0b..285c5906c2 100644
--- a/hadoop-hdds/rocks-native/src/main/native/SSTDumpTool.cpp
+++ b/hadoop-hdds/rocks-native/src/main/native/SSTDumpTool.cpp
@@ -27,7 +27,7 @@
jint
Java_org_apache_hadoop_hdds_utils_db_managed_ManagedSSTDumpTool_runInternal(JNIEnv
*env, jobject obj,
jobjectArray argsArray, jlong optionsHandle, jlong pipeHandle) {
ROCKSDB_NAMESPACE::SSTDumpTool dumpTool;
- ROCKSDB_NAMESPACE::Options *options =
reinterpret_cast<ROCKSDB_NAMESPACE::Options *>(optionsHandle);
+ ROCKSDB_NAMESPACE::Options options;
Pipe *pipe = reinterpret_cast<Pipe *>(pipeHandle);
int length = env->GetArrayLength(argsArray);
char *args[length + 1];
@@ -37,7 +37,7 @@ jint
Java_org_apache_hadoop_hdds_utils_db_managed_ManagedSSTDumpTool_runInternal
args[i + 1] = utf_str;
}
FILE *wr = fdopen(pipe->getWriteFd(), "w");
- int ret = dumpTool.Run(length + 1, args, *options, wr);
+ int ret = dumpTool.Run(length + 1, args, options, wr);
for (int i = 1; i < length + 1; i++) {
jstring str_val = (jstring)env->GetObjectArrayElement(argsArray,
(jsize)(i - 1));
env->ReleaseStringUTFChars(str_val, args[i]);
diff --git a/hadoop-hdds/rocks-native/src/main/patches/rocks-native.patch
b/hadoop-hdds/rocks-native/src/main/patches/rocks-native.patch
index 559d21495a..2da7aa6c47 100644
--- a/hadoop-hdds/rocks-native/src/main/patches/rocks-native.patch
+++ b/hadoop-hdds/rocks-native/src/main/patches/rocks-native.patch
@@ -17,7 +17,7 @@
*/
diff --git a/include/rocksdb/sst_dump_tool.h b/include/rocksdb/sst_dump_tool.h
-index 9261ba47d..09ed123e5 100644
+index 9261ba47d..1e62b88a3 100644
--- a/include/rocksdb/sst_dump_tool.h
+++ b/include/rocksdb/sst_dump_tool.h
@@ -11,7 +11,8 @@ namespace ROCKSDB_NAMESPACE {
@@ -25,13 +25,13 @@ index 9261ba47d..09ed123e5 100644
class SSTDumpTool {
public:
- int Run(int argc, char const* const* argv, Options options = Options());
-+ int Run(int argc, char const* const* argv,Options options = Options(),
++ int Run(int argc, char const* const* argv, Options options = Options(),
+ FILE* out = stdout, FILE* err = stderr);
};
} // namespace ROCKSDB_NAMESPACE
diff --git a/table/sst_file_dumper.cc b/table/sst_file_dumper.cc
-index eefbaaeee..503ec7aa6 100644
+index eefbaaeee..98d37dd5c 100644
--- a/table/sst_file_dumper.cc
+++ b/table/sst_file_dumper.cc
@@ -45,7 +45,7 @@ SstFileDumper::SstFileDumper(const Options& options,
@@ -55,7 +55,7 @@ index eefbaaeee..503ec7aa6 100644
read_options_.readahead_size = readahead_size;
if (!silent_) {
- fprintf(stdout, "Process %s\n", file_path.c_str());
-+ fprintf(out, "Process %s\n", file_path.c_str());
++ fprintf(out_, "Process %s\n", file_path.c_str());
}
init_result_ = GetTableReader(file_name_);
}
@@ -85,7 +85,7 @@ index eefbaaeee..503ec7aa6 100644
}
} else {
- fprintf(stdout, "Unsupported compression type: %s.\n", i.second);
-+ fprintf(out_, "Unsupported compression type: %s.\n", i.second);
++ fprintf(err_, "Unsupported compression type: %s.\n", i.second);
}
}
return Status::OK();
@@ -122,19 +122,10 @@ index eefbaaeee..503ec7aa6 100644
if (!s.ok()) {
if (!silent_) {
- fprintf(stdout, "Not able to read table properties\n");
-+ fprintf(out_, "Not able to read table properties\n");
++ fprintf(err_, "Not able to read table properties\n");
}
}
return s;
-@@ -382,7 +385,7 @@ Status SstFileDumper::SetTableOptionsByMagicNumber(
-
- options_.table_factory.reset(bbtf);
- if (!silent_) {
-- fprintf(stdout, "Sst file format: block-based\n");
-+ fprintf(out_, "Sst file format: block-based\n");
- }
-
- auto& props = table_properties_->user_collected_properties;
@@ -410,7 +413,7 @@ Status SstFileDumper::SetTableOptionsByMagicNumber(
options_.table_factory.reset(NewPlainTableFactory(plain_table_options));
@@ -144,7 +135,7 @@ index eefbaaeee..503ec7aa6 100644
}
} else {
char error_msg_buffer[80];
-@@ -427,7 +430,7 @@ Status SstFileDumper::SetOldTableOptions() {
+@@ -427,12 +430,53 @@ Status SstFileDumper::SetOldTableOptions() {
assert(table_properties_ == nullptr);
options_.table_factory = std::make_shared<BlockBasedTableFactory>();
if (!silent_) {
@@ -153,36 +144,82 @@ index eefbaaeee..503ec7aa6 100644
}
return Status::OK();
-@@ -478,21 +481,26 @@ Status SstFileDumper::ReadSequential(bool print_kv,
uint64_t read_num,
+ }
+
++void write(int value, FILE* file) {
++ char b[4];
++ b[3] = value & 0x000000ff;
++ b[2] = (value & 0x0000ff00) >> 8;
++ b[1] = (value & 0x00ff0000) >> 16;
++ b[0] = (value & 0xff000000) >> 24;
++ std::fwrite(b, 4, 1, file);
++}
++
++void write(const char* value, int length, FILE* file) {
++ write(length, file);
++ fwrite(value, length, 1, file);
++}
++
++void write(const std::string& value, FILE* file) {
++ write(value.data(), (int)value.length(), file);
++}
++
++void write(Slice &slice, FILE* file) {
++ int size = (int)slice.size();
++ write(slice.data(), size, file);
++}
++
++void write(SequenceNumber sequenceNumber, FILE* file) {
++
++ char b[8];
++ int idx = 7;
++ while (idx >= 0) {
++ b[idx] = sequenceNumber % 256;
++ sequenceNumber /= 256;
++ idx -= 1;
++ }
++ fwrite(b, 8, 1, file);
++}
++
++void write(ParsedInternalKey &key, FILE* file) {
++ write(key.user_key, file);
++ write(key.sequence, file);
++ write(static_cast<int>(key.type), file);
++}
++
+ Status SstFileDumper::ReadSequential(bool print_kv, uint64_t read_num,
+ bool has_from, const std::string&
from_key,
+ bool has_to, const std::string& to_key,
+@@ -478,22 +522,19 @@ Status SstFileDumper::ReadSequential(bool print_kv,
uint64_t read_num,
if (print_kv) {
if (!decode_blob_index_ || ikey.type != kTypeBlobIndex) {
- fprintf(stdout, "%s => %s\n",
-+ fprintf(out_, "%lu %s => %lu %s\n",
-+ ikey.DebugString(true, output_hex_).length(),
- ikey.DebugString(true, output_hex_).c_str(),
-+ value.ToString(output_hex_).length(),
- value.ToString(output_hex_).c_str());
+- ikey.DebugString(true, output_hex_).c_str(),
+- value.ToString(output_hex_).c_str());
++ write(ikey, out_);
++ write(value, out_);
} else {
BlobIndex blob_index;
-
+-
const Status s = blob_index.DecodeFrom(value);
if (!s.ok()) {
- fprintf(stderr, "%s => error decoding blob index\n",
-+ fprintf(err_, "%lu %s => 25 error decoding blob index\n",
-+ ikey.DebugString(true, output_hex_).length(),
- ikey.DebugString(true, output_hex_).c_str());
+- ikey.DebugString(true, output_hex_).c_str());
++ write(ikey, err_);
++ write("error decoding blob index", err_);
continue;
}
-
+-
- fprintf(stdout, "%s => %s\n",
-+ fprintf(out_, "%lu %s => %lu %s\n",
-+ ikey.DebugString(true, output_hex_).length(),
- ikey.DebugString(true, output_hex_).c_str(),
-+ blob_index.DebugString(output_hex_).length(),
- blob_index.DebugString(output_hex_).c_str());
+- ikey.DebugString(true, output_hex_).c_str(),
+- blob_index.DebugString(output_hex_).c_str());
++ write(ikey, out_);
++ std::string v = blob_index.DebugString(output_hex_);
++ write(v, out_);
}
}
+ }
diff --git a/table/sst_file_dumper.h b/table/sst_file_dumper.h
index 7be876390..ea07154da 100644
--- a/table/sst_file_dumper.h
@@ -208,7 +245,7 @@ index 7be876390..ea07154da 100644
} // namespace ROCKSDB_NAMESPACE
diff --git a/tools/sst_dump_tool.cc b/tools/sst_dump_tool.cc
-index 7053366e7..4bde14fff 100644
+index 7053366e7..75720a12d 100644
--- a/tools/sst_dump_tool.cc
+++ b/tools/sst_dump_tool.cc
@@ -31,7 +31,7 @@ static const std::vector<std::pair<CompressionType, const
char*>>
@@ -216,7 +253,7 @@ index 7053366e7..4bde14fff 100644
namespace {
-void print_help(bool to_stderr) {
-+void print_help(bool to_stderr, FILE* out, FILE* err) {
++void print_help(bool to_stderr, FILE* err_, FILE* out_) {
std::string supported_compressions;
for (CompressionType ct : GetSupportedCompressions()) {
if (!supported_compressions.empty()) {
@@ -225,7 +262,7 @@ index 7053366e7..4bde14fff 100644
}
fprintf(
- to_stderr ? stderr : stdout,
-+ to_stderr ? err : out,
++ to_stderr ? err_ : out_,
R"(sst_dump --file=<data_dir_OR_sst_file>
[--command=check|scan|raw|recompress|identify]
--file=<data_dir_OR_sst_file>
Path to SST file or directory containing SST files
@@ -239,16 +276,26 @@ index 7053366e7..4bde14fff 100644
std::string env_uri, fs_uri;
const char* dir_or_file = nullptr;
uint64_t read_num = std::numeric_limits<uint64_t>::max();
-@@ -248,7 +249,7 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
- return curr.second == compression_type;
- });
- if (iter == kCompressions.end()) {
-- fprintf(stderr, "%s is not a valid CompressionType\n",
-+ fprintf(err, "%s is not a valid CompressionType\n",
- compression_type.c_str());
- exit(1);
- }
-@@ -273,7 +274,7 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
+@@ -170,6 +171,7 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
+ bool has_compression_level_from = false;
+ bool has_compression_level_to = false;
+ bool has_specified_compression_types = false;
++ bool silent = false;
+ std::string from_key;
+ std::string to_key;
+ std::string block_size_str;
+@@ -197,7 +199,9 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
+ int64_t tmp_val;
+
+ for (int i = 1; i < argc; i++) {
+- if (strncmp(argv[i], "--env_uri=", 10) == 0) {
++ if (strncmp(argv[i], "--silent", 8) == 0) {
++ silent = true;
++ } else if (strncmp(argv[i], "--env_uri=", 10) == 0) {
+ env_uri = argv[i] + 10;
+ } else if (strncmp(argv[i], "--fs_uri=", 9) == 0) {
+ fs_uri = argv[i] + 9;
+@@ -273,7 +277,7 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
std::cerr << pik_status.getState() << "\n";
retc = -1;
}
@@ -257,7 +304,7 @@ index 7053366e7..4bde14fff 100644
return retc;
} else if (ParseIntArg(argv[i], "--compression_level_from=",
"compression_level_from must be numeric",
-@@ -288,9 +289,9 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
+@@ -288,9 +292,9 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
"compression_max_dict_bytes must be numeric",
&tmp_val)) {
if (tmp_val < 0 || tmp_val > std::numeric_limits<uint32_t>::max()) {
@@ -265,11 +312,11 @@ index 7053366e7..4bde14fff 100644
+ fprintf(err, "compression_max_dict_bytes must be a uint32_t: '%s'\n",
argv[i]);
- print_help(/*to_stderr*/ true);
-+ print_help(/*to_stderr*/ true, out, err);
++ print_help(/*to_stderr*/ true, err, out);
return 1;
}
compression_max_dict_bytes = static_cast<uint32_t>(tmp_val);
-@@ -298,10 +299,10 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
+@@ -298,10 +302,10 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
"compression_zstd_max_train_bytes must be numeric",
&tmp_val)) {
if (tmp_val < 0 || tmp_val > std::numeric_limits<uint32_t>::max()) {
@@ -278,11 +325,11 @@ index 7053366e7..4bde14fff 100644
"compression_zstd_max_train_bytes must be a uint32_t: '%s'\n",
argv[i]);
- print_help(/*to_stderr*/ true);
-+ print_help(/*to_stderr*/ true, out, err);
++ print_help(/*to_stderr*/ true, err, out);
return 1;
}
compression_zstd_max_train_bytes = static_cast<uint32_t>(tmp_val);
-@@ -309,41 +310,41 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
+@@ -309,41 +313,41 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
"compression_max_dict_buffer_bytes must be
numeric",
&tmp_val)) {
if (tmp_val < 0) {
@@ -291,7 +338,7 @@ index 7053366e7..4bde14fff 100644
"compression_max_dict_buffer_bytes must be positive: '%s'\n",
argv[i]);
- print_help(/*to_stderr*/ true);
-+ print_help(/*to_stderr*/ true, out, err);
++ print_help(/*to_stderr*/ true, err, out);
return 1;
}
compression_max_dict_buffer_bytes = static_cast<uint64_t>(tmp_val);
@@ -299,7 +346,7 @@ index 7053366e7..4bde14fff 100644
compression_use_zstd_finalize_dict = true;
} else if (strcmp(argv[i], "--help") == 0) {
- print_help(/*to_stderr*/ false);
-+ print_help(/*to_stderr*/ false, out, err);
++ print_help(/*to_stderr*/ false, err, out);
return 0;
} else if (strcmp(argv[i], "--version") == 0) {
printf("%s\n", GetRocksBuildInfoAsString("sst_dump").c_str());
@@ -308,7 +355,7 @@ index 7053366e7..4bde14fff 100644
- fprintf(stderr, "Unrecognized argument '%s'\n\n", argv[i]);
- print_help(/*to_stderr*/ true);
+ fprintf(err, "Unrecognized argument '%s'\n\n", argv[i]);
-+ print_help(/*to_stderr*/ true, out, err);
++ print_help(/*to_stderr*/ true, err, out);
return 1;
}
}
@@ -332,31 +379,32 @@ index 7053366e7..4bde14fff 100644
exit(1);
}
-@@ -357,8 +358,8 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
+@@ -357,8 +361,8 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
}
if (dir_or_file == nullptr) {
- fprintf(stderr, "file or directory must be specified.\n\n");
- print_help(/*to_stderr*/ true);
+ fprintf(err, "file or directory must be specified.\n\n");
-+ print_help(/*to_stderr*/ true, out, err);
++ print_help(/*to_stderr*/ true, err, out);
exit(1);
}
-@@ -373,10 +374,10 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
+@@ -373,10 +377,10 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
Status s = Env::CreateFromUri(config_options, env_uri, fs_uri,
&options.env,
&env_guard);
if (!s.ok()) {
- fprintf(stderr, "CreateEnvFromUri: %s\n", s.ToString().c_str());
+ fprintf(err, "CreateEnvFromUri: %s\n", s.ToString().c_str());
exit(1);
- } else {
+- } else {
- fprintf(stdout, "options.env is %p\n", options.env);
++ } else if (!silent){
+ fprintf(out, "options.env is %p\n", options.env);
}
}
-@@ -390,7 +391,7 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
+@@ -390,7 +394,7 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
Status s = env->FileExists(dir_or_file);
// dir_or_file does not exist
if (!s.ok()) {
@@ -365,13 +413,13 @@ index 7053366e7..4bde14fff 100644
dir_or_file);
return 1;
}
-@@ -421,10 +422,11 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
+@@ -421,10 +425,11 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
ROCKSDB_NAMESPACE::SstFileDumper dumper(
options, filename, Temperature::kUnknown, readahead_size,
- verify_checksum, output_hex, decode_blob_index);
+ verify_checksum, output_hex, decode_blob_index, EnvOptions(),
-+ false, out, err);
++ silent, out, err);
// Not a valid SST
if (!dumper.getStatus().ok()) {
- fprintf(stderr, "%s: %s\n", filename.c_str(),
@@ -379,16 +427,19 @@ index 7053366e7..4bde14fff 100644
dumper.getStatus().ToString().c_str());
continue;
} else {
-@@ -434,7 +436,7 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
+@@ -433,8 +438,9 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
+ // where there is at least one valid SST
if (valid_sst_files.size() == 1) {
// from_key and to_key are only used for "check", "scan", or ""
- if (command == "check" || command == "scan" || command == "") {
+- if (command == "check" || command == "scan" || command == "") {
- fprintf(stdout, "from [%s] to [%s]\n",
++ if (!silent && (command == "check" || command == "scan" ||
++ command == "")) {
+ fprintf(out, "from [%s] to [%s]\n",
ROCKSDB_NAMESPACE::Slice(from_key).ToString(true).c_str(),
ROCKSDB_NAMESPACE::Slice(to_key).ToString(true).c_str());
}
-@@ -449,7 +451,7 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
+@@ -449,7 +455,7 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
compression_zstd_max_train_bytes, compression_max_dict_buffer_bytes,
!compression_use_zstd_finalize_dict);
if (!st.ok()) {
@@ -397,7 +448,7 @@ index 7053366e7..4bde14fff 100644
exit(1);
}
return 0;
-@@ -461,10 +463,10 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
+@@ -461,10 +467,10 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
st = dumper.DumpTable(out_filename);
if (!st.ok()) {
@@ -410,7 +461,7 @@ index 7053366e7..4bde14fff 100644
}
continue;
}
-@@ -476,7 +478,7 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
+@@ -476,7 +482,7 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
has_from || use_from_as_prefix, from_key, has_to, to_key,
use_from_as_prefix);
if (!st.ok()) {
@@ -419,7 +470,7 @@ index 7053366e7..4bde14fff 100644
st.ToString().c_str());
}
total_read += dumper.GetReadNumber();
-@@ -488,10 +490,10 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
+@@ -488,10 +494,10 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
if (command == "verify") {
st = dumper.VerifyChecksum();
if (!st.ok()) {
@@ -432,7 +483,7 @@ index 7053366e7..4bde14fff 100644
}
continue;
}
-@@ -503,15 +505,15 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
+@@ -503,15 +509,15 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
table_properties_from_reader;
st = dumper.ReadTableProperties(&table_properties_from_reader);
if (!st.ok()) {
@@ -451,7 +502,7 @@ index 7053366e7..4bde14fff 100644
"Table Properties:\n"
"------------------------------\n"
" %s",
-@@ -523,30 +525,30 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
+@@ -523,18 +529,18 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
total_index_block_size += table_properties->index_size;
total_filter_block_size += table_properties->filter_size;
if (show_properties) {
@@ -473,30 +524,11 @@ index 7053366e7..4bde14fff 100644
}
}
}
- if (show_summary) {
-- fprintf(stdout, "total number of files: %" PRIu64 "\n", total_num_files);
-- fprintf(stdout, "total number of data blocks: %" PRIu64 "\n",
-+ fprintf(out, "total number of files: %" PRIu64 "\n", total_num_files);
-+ fprintf(out, "total number of data blocks: %" PRIu64 "\n",
- total_num_data_blocks);
-- fprintf(stdout, "total data block size: %" PRIu64 "\n",
-+ fprintf(out, "total data block size: %" PRIu64 "\n",
- total_data_block_size);
-- fprintf(stdout, "total index block size: %" PRIu64 "\n",
-+ fprintf(out, "total index block size: %" PRIu64 "\n",
- total_index_block_size);
-- fprintf(stdout, "total filter block size: %" PRIu64 "\n",
-+ fprintf(out, "total filter block size: %" PRIu64 "\n",
- total_filter_block_size);
- }
-
-@@ -554,24 +556,24 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
- // No valid SST files are found
+@@ -555,9 +561,9 @@ int SSTDumpTool::Run(int argc, char const* const* argv,
Options options) {
// Exit with an error state
if (dir) {
-- fprintf(stdout, "------------------------------\n");
+ fprintf(stdout, "------------------------------\n");
- fprintf(stderr, "No valid SST files found in %s\n", dir_or_file);
-+ fprintf(out, "------------------------------\n");
+ fprintf(err, "No valid SST files found in %s\n", dir_or_file);
} else {
- fprintf(stderr, "%s is not a valid SST file\n", dir_or_file);
@@ -504,22 +536,3 @@ index 7053366e7..4bde14fff 100644
}
return 1;
} else {
- if (command == "identify") {
- if (dir) {
-- fprintf(stdout, "------------------------------\n");
-- fprintf(stdout, "List of valid SST files found in %s:\n",
dir_or_file);
-+ fprintf(out, "------------------------------\n");
-+ fprintf(out, "List of valid SST files found in %s:\n", dir_or_file);
- for (const auto& f : valid_sst_files) {
-- fprintf(stdout, "%s\n", f.c_str());
-+ fprintf(out, "%s\n", f.c_str());
- }
-- fprintf(stdout, "Number of valid SST files: %zu\n",
-+ fprintf(out, "Number of valid SST files: %zu\n",
- valid_sst_files.size());
- } else {
-- fprintf(stdout, "%s is a valid SST file\n", dir_or_file);
-+ fprintf(out, "%s is a valid SST file\n", dir_or_file);
- }
- }
- // At least one valid SST
diff --git
a/hadoop-hdds/rocks-native/src/test/java/org/apache/hadoop/hdds/utils/db/managed/Native.java
b/hadoop-hdds/rocks-native/src/test/java/org/apache/hadoop/hdds/utils/db/managed/Native.java
new file mode 100644
index 0000000000..86b980907a
--- /dev/null
+++
b/hadoop-hdds/rocks-native/src/test/java/org/apache/hadoop/hdds/utils/db/managed/Native.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.utils.db.managed;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import org.junit.jupiter.api.Tag;
+
+/**
+ * Annotation to mark test classes or methods with some intermittent failures.
+ * These are handled separately from the normal tests. (Not required to pass,
+ * may be repeated automatically, etc.)
+ */
+@Target({ ElementType.TYPE, ElementType.METHOD })
+@Retention(RetentionPolicy.RUNTIME)
+@Tag("native")
+public @interface Native {
+ /**
+ * Native Library being used.
+ */
+ String value() default "";
+}
diff --git
a/hadoop-hdds/rocks-native/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TestManagedSSTDumpIterator.java
b/hadoop-hdds/rocks-native/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TestManagedSSTDumpIterator.java
new file mode 100644
index 0000000000..2167c352b3
--- /dev/null
+++
b/hadoop-hdds/rocks-native/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TestManagedSSTDumpIterator.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db.managed;
+
+import com.google.common.primitives.Bytes;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Named;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * Test for ManagedSSTDumpIterator.
+ */
+@Native("Managed Rocks Tools")
+public class TestManagedSSTDumpIterator {
+
+ private File createSSTFileWithKeys(
+ TreeMap<Pair<String, Integer>, String> keys) throws Exception {
+ File file = File.createTempFile("tmp_sst_file", ".sst");
+ file.deleteOnExit();
+ try (ManagedEnvOptions envOptions = new ManagedEnvOptions();
+ ManagedOptions managedOptions = new ManagedOptions();
+ ManagedSstFileWriter sstFileWriter = new ManagedSstFileWriter(
+ envOptions, managedOptions)) {
+ sstFileWriter.open(file.getAbsolutePath());
+ for (Map.Entry<Pair<String, Integer>, String> entry : keys.entrySet()) {
+ if (entry.getKey().getValue() == 0) {
+ sstFileWriter.delete(entry.getKey().getKey()
+ .getBytes(StandardCharsets.UTF_8));
+ } else {
+ sstFileWriter.put(entry.getKey().getKey()
+ .getBytes(StandardCharsets.UTF_8),
+ entry.getValue().getBytes(StandardCharsets.UTF_8));
+ }
+ }
+ sstFileWriter.finish();
+ }
+ return file;
+ }
+
+ private static Stream<? extends Arguments> keyValueFormatArgs() {
+ return Stream.of(
+ Arguments.of(
+ Named.of("Key starting with a single quote",
+ "'key%1$d=>"),
+ Named.of("Value starting with a number ending with a" +
+ " single quote", "%1$dvalue'")
+ ),
+ Arguments.of(
+ Named.of("Key ending with a number", "key%1$d"),
+ Named.of("Value starting & ending with a number", "%1$dvalue%1$d")
+ ),
+ Arguments.of(
+ Named.of("Key starting with a single quote & ending" +
+ " with a number", "'key%1$d"),
+ Named.of("Value starting & ending with a number " +
+ "& elosed within quotes", "%1$d'value%1$d'")),
+ Arguments.of(
+ Named.of("Key starting with a single quote & ending" +
+ " with a number", "'key%1$d"),
+ Named.of("Value starting & ending with a number " +
+ "& elosed within quotes", "%1$d'value%1$d'")
+ ),
+ Arguments.of(
+ Named.of("Key ending with a number", "key%1$d"),
+ Named.of("Value starting & ending with a number " +
+ "& containing null character & new line character",
+ "%1$dvalue\n\0%1$d")
+ ),
+ Arguments.of(
+ Named.of("Key ending with a number & containing" +
+ " a null character", "key\0%1$d"),
+ Named.of("Value starting & ending with a number " +
+ "& elosed within quotes", "%1$dvalue\r%1$d")
+ )
+ );
+ }
+
+ private static byte[] getBytes(Integer val) {
+ ByteBuffer destByteBuffer = ByteBuffer.allocate(4);
+ destByteBuffer.order(ByteOrder.BIG_ENDIAN);
+ destByteBuffer.putInt(val);
+ return destByteBuffer.array();
+ }
+
+ private static byte[] getBytes(Long val) {
+ ByteBuffer destByteBuffer = ByteBuffer.allocate(8);
+ destByteBuffer.order(ByteOrder.BIG_ENDIAN);
+ destByteBuffer.putLong(val);
+ return destByteBuffer.array();
+ }
+
+ private static byte[] getBytes(String val) {
+ byte[] b = new byte[val.length()];
+ for (int i = 0; i < val.length(); i++) {
+ b[i] = (byte) val.charAt(i);
+ }
+ return b;
+ }
+
+ private static Stream<? extends Arguments> invalidPipeInputStreamBytes() {
+ return Stream.of(
+ Arguments.of(Named.of("Invalid 3 byte integer",
+ new byte[]{0, 0, 0})),
+ Arguments.of(Named.of("Invalid 2 byte integer",
+ new byte[]{0, 0})),
+ Arguments.of(Named.of("Invalid 1 byte integer",
+ new byte[]{0, 0})),
+ Arguments.of(Named.of("Invalid key name length",
+ Bytes.concat(getBytes(4), getBytes("key")))),
+ Arguments.of(Named.of("Invalid Unsigned Long length",
+ Bytes.concat(getBytes(4), getBytes("key1"),
+ new byte[]{0, 0}))),
+ Arguments.of(Named.of("Invalid Sequence number",
+ Bytes.concat(getBytes(4), getBytes("key1")))),
+ Arguments.of(Named.of("Invalid Type",
+ Bytes.concat(getBytes(4), getBytes("key1"),
+ getBytes(4L)))),
+ Arguments.of(Named.of("Invalid Value",
+ Bytes.concat(getBytes(4), getBytes("key"),
+ getBytes(4L), getBytes(0)))),
+ Arguments.of(Named.of("Invalid Value length",
+ Bytes.concat(getBytes(4), getBytes("key"),
+ getBytes(4L), getBytes(1), getBytes(6),
+ getBytes("val"))))
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("keyValueFormatArgs")
+ public void testSSTDumpIteratorWithKeyFormat(String keyFormat,
+ String valueFormat)
+ throws Exception {
+ TreeMap<Pair<String, Integer>, String> keys =
+ IntStream.range(0, 100).boxed().collect(
+ Collectors.toMap(
+ i -> Pair.of(String.format(keyFormat, i), i % 2),
+ i -> i % 2 == 0 ? "" : String.format(valueFormat, i),
+ (v1, v2) -> v2,
+ TreeMap::new));
+ File file = createSSTFileWithKeys(keys);
+ ExecutorService executorService =
+ new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(1),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ ManagedSSTDumpTool tool = new ManagedSSTDumpTool(executorService, 8192);
+ try (ManagedOptions options = new ManagedOptions(); ManagedSSTDumpIterator
+ <ManagedSSTDumpIterator.KeyValue> iterator =
+ new ManagedSSTDumpIterator<ManagedSSTDumpIterator.KeyValue>(tool,
+ file.getAbsolutePath(), options) {
+ @Override
+ protected KeyValue getTransformedValue(
+ Optional<KeyValue> value) {
+ return value.orElse(null);
+ }
+ }
+ ) {
+ while (iterator.hasNext()) {
+ ManagedSSTDumpIterator.KeyValue r = iterator.next();
+ Pair<String, Integer> recordKey = Pair.of(new String(r.getKey(),
+ StandardCharsets.UTF_8), r.getType());
+ Assertions.assertTrue(keys.containsKey(recordKey));
+ Assertions.assertEquals(
+ Optional.ofNullable(keys.get(recordKey)).orElse(""),
+ new String(r.getValue(), StandardCharsets.UTF_8));
+ keys.remove(recordKey);
+ }
+ Assertions.assertEquals(0, keys.size());
+ }
+ executorService.shutdown();
+ }
+
+
+ @ParameterizedTest
+ @MethodSource("invalidPipeInputStreamBytes")
+ public void testInvalidSSTDumpIteratorWithKeyFormat(byte[] inputBytes)
+ throws NativeLibraryNotLoadedException, ExecutionException,
+ InterruptedException, IOException {
+ ByteArrayInputStream byteArrayInputStream =
+ new ByteArrayInputStream(inputBytes);
+ ManagedSSTDumpTool tool = Mockito.mock(ManagedSSTDumpTool.class);
+ File file = File.createTempFile("tmp", ".sst");
+ Future future = Mockito.mock(Future.class);
+ Mockito.when(future.isDone()).thenReturn(false);
+ Mockito.when(future.get()).thenReturn(0);
+ Mockito.when(tool.run(Matchers.any(String[].class),
+ Matchers.any(ManagedOptions.class)))
+ .thenReturn(new ManagedSSTDumpTool.SSTDumpToolTask(future,
+ byteArrayInputStream));
+ try (ManagedOptions options = new ManagedOptions()) {
+ Assertions.assertThrows(IllegalStateException.class,
+ () -> new ManagedSSTDumpIterator<ManagedSSTDumpIterator.KeyValue>(
+ tool, file.getAbsolutePath(), options) {
+ @Override
+ protected KeyValue getTransformedValue(
+ Optional<KeyValue> value) {
+ return value.orElse(null);
+ }
+ });
+ }
+ }
+}
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/ManagedSstFileReader.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/ManagedSstFileReader.java
index 0e8155ff7a..774c2f637c 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/ManagedSstFileReader.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/ManagedSstFileReader.java
@@ -18,6 +18,7 @@
package org.apache.ozone.rocksdb.util;
+import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.util.ClosableIterator;
import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException;
import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
@@ -34,6 +35,8 @@ import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
@@ -122,8 +125,7 @@ public class ManagedSstFileReader {
}
public Stream<String> getKeyStreamWithTombstone(
- ManagedSSTDumpTool sstDumpTool) throws IOException, RocksDBException,
- NativeLibraryNotLoadedException {
+ ManagedSSTDumpTool sstDumpTool) throws RocksDBException {
final MultipleSstFileIterator<String> itr =
new MultipleSstFileIterator<String>(sstFiles) {
//TODO: [SNAPSHOT] Check if default Options is enough.
@@ -140,8 +142,9 @@ public class ManagedSstFileReader {
return new ManagedSSTDumpIterator<String>(sstDumpTool, file,
options) {
@Override
- protected String getTransformedValue(KeyValue value) {
- return value.getKey();
+ protected String getTransformedValue(Optional<KeyValue> value) {
+ return value.map(v -> StringUtils.bytes2String(v.getKey()))
+ .orElse(null);
}
};
}
@@ -198,12 +201,9 @@ public class ManagedSstFileReader {
private String currentFile;
private ClosableIterator<T> currentFileIterator;
- private MultipleSstFileIterator(Collection<String> files)
- throws IOException, RocksDBException,
- NativeLibraryNotLoadedException {
+ private MultipleSstFileIterator(Collection<String> files) {
this.fileNameIterator = files.iterator();
init();
- moveToNextFile();
}
protected abstract void init();
@@ -216,7 +216,8 @@ public class ManagedSstFileReader {
public boolean hasNext() {
try {
do {
- if (currentFileIterator.hasNext()) {
+ if (Objects.nonNull(currentFileIterator) &&
+ currentFileIterator.hasNext()) {
return true;
}
} while (moveToNextFile());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]