This is an automated email from the ASF dual-hosted git repository.
prashantpogde pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 5b1319a8c2 HDDS-8277. [Snapshot] Parsing Bug in SSTDumptool (#4470)
5b1319a8c2 is described below
commit 5b1319a8c227cb847536837875f0a3daf8697a83
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Mon Apr 24 11:42:12 2023 -0700
HDDS-8277. [Snapshot] Parsing Bug in SSTDumptool (#4470)
* HDDS-8277. Parsing Bug in SSTDumptool
* HDDS-8277. Add license headers
* HDDS-8277. Address review comments
* HDDS-8277. Fix checkstyle
* HDDS-8277: Fix indendation
---
.../utils/db/managed/ManagedSSTDumpIterator.java | 111 +++++++++++++--------
.../hdds/utils/db/managed/ManagedSSTDumpTool.java | 17 ++--
.../src/main/patches/rocks-native.patch | 18 ++--
3 files changed, 93 insertions(+), 53 deletions(-)
diff --git
a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java
b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java
index a083a2b988..af54208cb0 100644
---
a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java
+++
b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java
@@ -29,8 +29,8 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
+import java.util.Optional;
import java.util.Arrays;
-import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -43,18 +43,18 @@ public abstract class ManagedSSTDumpIterator<T> implements
ClosableIterator<T> {
private static final Logger LOG =
LoggerFactory.getLogger(ManagedSSTDumpIterator.class);
+ // Since we don't have any restriction on the key, we are prepending
+ // the length of the pattern in the sst dump tool output.
+ // The first token in the pattern is the key.
+ // The second tells the sequence number of the key.
+ // The third token gives the type of key in the sst file.
private static final String PATTERN_REGEX =
- "'([^=>]+)' seq:([0-9]+), type:([0-9]+) => ";
-
+ "'([\\s\\S]+)' seq:([0-9]+), type:([0-9]+)";
public static final int PATTERN_KEY_GROUP_NUMBER = 1;
public static final int PATTERN_SEQ_GROUP_NUMBER = 2;
public static final int PATTERN_TYPE_GROUP_NUMBER = 3;
private static final Pattern PATTERN_MATCHER =
Pattern.compile(PATTERN_REGEX);
private BufferedReader processOutput;
- private StringBuilder stdoutString;
-
- private Matcher currentMatcher;
- private int prevMatchEndIndex;
private KeyValue currentKey;
private char[] charBuffer;
private KeyValue nextKey;
@@ -80,16 +80,55 @@ public abstract class ManagedSSTDumpIterator<T> implements
ClosableIterator<T> {
this.stackTrace = Thread.currentThread().getStackTrace();
}
+ /**
+ * Parses next occuring number in the stream.
+ *
+ * @return Optional of the integer empty if no integer exists
+ */
+ private Optional<Integer> getNextNumberInStream() throws IOException {
+ StringBuilder value = new StringBuilder();
+ int val;
+ while ((val = processOutput.read()) != -1) {
+ if (val >= '0' && val <= '9') {
+ value.append((char) val);
+ } else if (value.length() > 0) {
+ break;
+ }
+ }
+ return value.length() > 0 ? Optional.of(Integer.valueOf(value.toString()))
+ : Optional.empty();
+ }
+
+ /**
+ * Reads the next n chars from the stream & makes a string.
+ *
+ * @param numberOfChars
+ * @return String of next chars read
+ * @throws IOException
+ */
+ private String readNextNumberOfCharsFromStream(int numberOfChars)
+ throws IOException {
+ StringBuilder value = new StringBuilder();
+ while (numberOfChars > 0) {
+ int noOfCharsRead = processOutput.read(charBuffer, 0,
+ Math.min(numberOfChars, charBuffer.length));
+ if (noOfCharsRead == -1) {
+ break;
+ }
+ value.append(charBuffer, 0, noOfCharsRead);
+ numberOfChars -= noOfCharsRead;
+ }
+
+ return value.toString();
+ }
+
private void init(ManagedSSTDumpTool sstDumpTool, File sstFile,
ManagedOptions options)
throws NativeLibraryNotLoadedException {
String[] args = {"--file=" + sstFile.getAbsolutePath(), "--command=scan"};
this.sstDumpToolTask = sstDumpTool.run(args, options);
- processOutput = new BufferedReader(
- new InputStreamReader(sstDumpToolTask.getPipedOutput(),
- StandardCharsets.UTF_8));
- stdoutString = new StringBuilder();
- currentMatcher = PATTERN_MATCHER.matcher(stdoutString);
+ processOutput = new BufferedReader(new InputStreamReader(
+ sstDumpToolTask.getPipedOutput(), StandardCharsets.UTF_8));
charBuffer = new char[8192];
open = new AtomicBoolean(true);
next();
@@ -142,37 +181,33 @@ public abstract class ManagedSSTDumpIterator<T>
implements ClosableIterator<T> {
checkSanityOfProcess();
currentKey = nextKey;
nextKey = null;
- while (!currentMatcher.find()) {
+ boolean keyFound = false;
+ while (!keyFound) {
try {
- if (prevMatchEndIndex != 0) {
- stdoutString = new StringBuilder(
- stdoutString.substring(prevMatchEndIndex,
stdoutString.length()));
- prevMatchEndIndex = 0;
- currentMatcher = PATTERN_MATCHER.matcher(stdoutString);
+ Optional<Integer> keyLength = getNextNumberInStream();
+ if (!keyLength.isPresent()) {
+ return getTransformedValue(currentKey);
}
- int numberOfCharsRead = processOutput.read(charBuffer);
- if (numberOfCharsRead < 0) {
- if (currentKey != null) {
- currentKey.setValue(stdoutString.substring(0,
- Math.max(stdoutString.length() - 1, 0)));
- return getTransformedValue(currentKey);
+ String keyStr = readNextNumberOfCharsFromStream(keyLength.get());
+ Matcher matcher = PATTERN_MATCHER.matcher(keyStr);
+ if (keyStr.length() == keyLength.get() && matcher.find()) {
+ Optional<Integer> valueLength = getNextNumberInStream();
+ if (valueLength.isPresent()) {
+ String valueStr = readNextNumberOfCharsFromStream(
+ valueLength.get());
+ if (valueStr.length() == valueLength.get()) {
+ keyFound = true;
+ nextKey = new KeyValue(matcher.group(PATTERN_KEY_GROUP_NUMBER),
+ matcher.group(PATTERN_SEQ_GROUP_NUMBER),
+ matcher.group(PATTERN_TYPE_GROUP_NUMBER),
+ valueStr);
+ }
}
- throw new NoSuchElementException("No more elements found");
}
- stdoutString.append(charBuffer, 0, numberOfCharsRead);
- currentMatcher.reset();
} catch (IOException e) {
throw new RuntimeIOException(e);
}
}
- if (currentKey != null) {
- currentKey.setValue(stdoutString.substring(prevMatchEndIndex,
- currentMatcher.start() - 1));
- }
- prevMatchEndIndex = currentMatcher.end();
- nextKey = new KeyValue(currentMatcher.group(PATTERN_KEY_GROUP_NUMBER),
- currentMatcher.group(PATTERN_SEQ_GROUP_NUMBER),
- currentMatcher.group(PATTERN_TYPE_GROUP_NUMBER));
return getTransformedValue(currentKey);
}
@@ -215,13 +250,11 @@ public abstract class ManagedSSTDumpIterator<T>
implements ClosableIterator<T> {
private String value;
- private KeyValue(String key, String sequence, String type) {
+ private KeyValue(String key, String sequence, String type,
+ String value) {
this.key = key;
this.sequence = Integer.valueOf(sequence);
this.type = Integer.valueOf(type);
- }
-
- private void setValue(String value) {
this.value = value;
}
diff --git
a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpTool.java
b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpTool.java
index 940438a457..649378fad6 100644
---
a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpTool.java
+++
b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpTool.java
@@ -35,35 +35,36 @@ public class ManagedSSTDumpTool {
static {
NativeLibraryLoader.getInstance()
- .loadLibrary(ROCKS_TOOLS_NATIVE_LIBRARY_NAME);
+ .loadLibrary(ROCKS_TOOLS_NATIVE_LIBRARY_NAME);
}
+
private int bufferCapacity;
private ExecutorService executorService;
public ManagedSSTDumpTool(ExecutorService executorService,
int bufferCapacity)
- throws NativeLibraryNotLoadedException {
+ throws NativeLibraryNotLoadedException {
if (!NativeLibraryLoader.isLibraryLoaded(ROCKS_TOOLS_NATIVE_LIBRARY_NAME))
{
throw new NativeLibraryNotLoadedException(
- ROCKS_TOOLS_NATIVE_LIBRARY_NAME);
+ ROCKS_TOOLS_NATIVE_LIBRARY_NAME);
}
this.bufferCapacity = bufferCapacity;
this.executorService = executorService;
}
public SSTDumpToolTask run(String[] args, ManagedOptions options)
- throws NativeLibraryNotLoadedException {
+ throws NativeLibraryNotLoadedException {
PipeInputStream pipeInputStream = new PipeInputStream(bufferCapacity);
return new SSTDumpToolTask(this.executorService.submit(() ->
- this.runInternal(args, options.getNativeHandle(),
+ this.runInternal(args, options.getNativeHandle(),
pipeInputStream.getNativeHandle())), pipeInputStream);
}
public SSTDumpToolTask run(Map<String, String> args, ManagedOptions options)
- throws NativeLibraryNotLoadedException {
+ throws NativeLibraryNotLoadedException {
return this.run(args.entrySet().stream().map(e -> "--"
- + (e.getValue() == null || e.getValue().isEmpty() ? e.getKey() :
- e.getKey() + "=" + e.getValue())).toArray(String[]::new), options);
+ + (e.getValue() == null || e.getValue().isEmpty() ? e.getKey() :
+ e.getKey() + "=" + e.getValue())).toArray(String[]::new), options);
}
private native int runInternal(String[] args, long optionsHandle,
diff --git a/hadoop-hdds/rocks-native/src/main/patches/rocks-native.patch
b/hadoop-hdds/rocks-native/src/main/patches/rocks-native.patch
index eb4501a77a..559d21495a 100644
--- a/hadoop-hdds/rocks-native/src/main/patches/rocks-native.patch
+++ b/hadoop-hdds/rocks-native/src/main/patches/rocks-native.patch
@@ -31,7 +31,7 @@ index 9261ba47d..09ed123e5 100644
} // namespace ROCKSDB_NAMESPACE
diff --git a/table/sst_file_dumper.cc b/table/sst_file_dumper.cc
-index eefbaaeee..3f2b42324 100644
+index eefbaaeee..503ec7aa6 100644
--- a/table/sst_file_dumper.cc
+++ b/table/sst_file_dumper.cc
@@ -45,7 +45,7 @@ SstFileDumper::SstFileDumper(const Options& options,
@@ -153,30 +153,36 @@ index eefbaaeee..3f2b42324 100644
}
return Status::OK();
-@@ -478,7 +481,7 @@ Status SstFileDumper::ReadSequential(bool print_kv,
uint64_t read_num,
+@@ -478,21 +481,26 @@ Status SstFileDumper::ReadSequential(bool print_kv,
uint64_t read_num,
if (print_kv) {
if (!decode_blob_index_ || ikey.type != kTypeBlobIndex) {
- fprintf(stdout, "%s => %s\n",
-+ fprintf(out_, "%s => %s\n",
++ fprintf(out_, "%lu %s => %lu %s\n",
++ ikey.DebugString(true, output_hex_).length(),
ikey.DebugString(true, output_hex_).c_str(),
++ value.ToString(output_hex_).length(),
value.ToString(output_hex_).c_str());
} else {
-@@ -486,12 +489,12 @@ Status SstFileDumper::ReadSequential(bool print_kv,
uint64_t read_num,
+ BlobIndex blob_index;
const Status s = blob_index.DecodeFrom(value);
if (!s.ok()) {
- fprintf(stderr, "%s => error decoding blob index\n",
-+ fprintf(err_, "%s => error decoding blob index\n",
++ fprintf(err_, "%lu %s => 25 error decoding blob index\n",
++ ikey.DebugString(true, output_hex_).length(),
ikey.DebugString(true, output_hex_).c_str());
continue;
}
- fprintf(stdout, "%s => %s\n",
-+ fprintf(out_, "%s => %s\n",
++ fprintf(out_, "%lu %s => %lu %s\n",
++ ikey.DebugString(true, output_hex_).length(),
ikey.DebugString(true, output_hex_).c_str(),
++ blob_index.DebugString(output_hex_).length(),
blob_index.DebugString(output_hex_).c_str());
}
+ }
diff --git a/table/sst_file_dumper.h b/table/sst_file_dumper.h
index 7be876390..ea07154da 100644
--- a/table/sst_file_dumper.h
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]