This is an automated email from the ASF dual-hosted git repository.

prashantpogde pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b1319a8c2 HDDS-8277. [Snapshot] Parsing Bug in SSTDumptool (#4470)
5b1319a8c2 is described below

commit 5b1319a8c227cb847536837875f0a3daf8697a83
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Mon Apr 24 11:42:12 2023 -0700

    HDDS-8277. [Snapshot] Parsing Bug in SSTDumptool (#4470)
    
    * HDDS-8277. Parsing Bug in SSTDumptool
    
    * HDDS-8277. Add license headers
    
    * HDDS-8277. Address review comments
    
    * HDDS-8277. Fix checkstyle
    
    * HDDS-8277: Fix indendation
---
 .../utils/db/managed/ManagedSSTDumpIterator.java   | 111 +++++++++++++--------
 .../hdds/utils/db/managed/ManagedSSTDumpTool.java  |  17 ++--
 .../src/main/patches/rocks-native.patch            |  18 ++--
 3 files changed, 93 insertions(+), 53 deletions(-)

diff --git 
a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java
 
b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java
index a083a2b988..af54208cb0 100644
--- 
a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java
+++ 
b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java
@@ -29,8 +29,8 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.UncheckedIOException;
 import java.nio.charset.StandardCharsets;
+import java.util.Optional;
 import java.util.Arrays;
-import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -43,18 +43,18 @@ public abstract class ManagedSSTDumpIterator<T> implements 
ClosableIterator<T> {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(ManagedSSTDumpIterator.class);
+  // Since we don't have any restriction on the key, we are prepending
+  // the length of the pattern in the sst dump tool output.
+  // The first token in the pattern is the key.
+  // The second tells the sequence number of the key.
+  // The third token gives the type of key in the sst file.
   private static final String PATTERN_REGEX =
-      "'([^=>]+)' seq:([0-9]+), type:([0-9]+) => ";
-
+      "'([\\s\\S]+)' seq:([0-9]+), type:([0-9]+)";
   public static final int PATTERN_KEY_GROUP_NUMBER = 1;
   public static final int PATTERN_SEQ_GROUP_NUMBER = 2;
   public static final int PATTERN_TYPE_GROUP_NUMBER = 3;
   private static final Pattern PATTERN_MATCHER = 
Pattern.compile(PATTERN_REGEX);
   private BufferedReader processOutput;
-  private StringBuilder stdoutString;
-
-  private Matcher currentMatcher;
-  private int prevMatchEndIndex;
   private KeyValue currentKey;
   private char[] charBuffer;
   private KeyValue nextKey;
@@ -80,16 +80,55 @@ public abstract class ManagedSSTDumpIterator<T> implements 
ClosableIterator<T> {
     this.stackTrace = Thread.currentThread().getStackTrace();
   }
 
+  /**
+   * Parses next occuring number in the stream.
+   *
+   * @return Optional of the integer empty if no integer exists
+   */
+  private Optional<Integer> getNextNumberInStream() throws IOException {
+    StringBuilder value = new StringBuilder();
+    int val;
+    while ((val = processOutput.read()) != -1) {
+      if (val >= '0' && val <= '9') {
+        value.append((char) val);
+      } else if (value.length() > 0) {
+        break;
+      }
+    }
+    return value.length() > 0 ? Optional.of(Integer.valueOf(value.toString()))
+        : Optional.empty();
+  }
+
+  /**
+   * Reads the next n chars from the stream & makes a string.
+   *
+   * @param numberOfChars
+   * @return String of next chars read
+   * @throws IOException
+   */
+  private String readNextNumberOfCharsFromStream(int numberOfChars)
+      throws IOException {
+    StringBuilder value = new StringBuilder();
+    while (numberOfChars > 0) {
+      int noOfCharsRead = processOutput.read(charBuffer, 0,
+          Math.min(numberOfChars, charBuffer.length));
+      if (noOfCharsRead == -1) {
+        break;
+      }
+      value.append(charBuffer, 0, noOfCharsRead);
+      numberOfChars -= noOfCharsRead;
+    }
+
+    return value.toString();
+  }
+
   private void init(ManagedSSTDumpTool sstDumpTool, File sstFile,
                     ManagedOptions options)
       throws NativeLibraryNotLoadedException {
     String[] args = {"--file=" + sstFile.getAbsolutePath(), "--command=scan"};
     this.sstDumpToolTask = sstDumpTool.run(args, options);
-    processOutput = new BufferedReader(
-        new InputStreamReader(sstDumpToolTask.getPipedOutput(),
-            StandardCharsets.UTF_8));
-    stdoutString = new StringBuilder();
-    currentMatcher = PATTERN_MATCHER.matcher(stdoutString);
+    processOutput = new BufferedReader(new InputStreamReader(
+        sstDumpToolTask.getPipedOutput(), StandardCharsets.UTF_8));
     charBuffer = new char[8192];
     open = new AtomicBoolean(true);
     next();
@@ -142,37 +181,33 @@ public abstract class ManagedSSTDumpIterator<T> 
implements ClosableIterator<T> {
     checkSanityOfProcess();
     currentKey = nextKey;
     nextKey = null;
-    while (!currentMatcher.find()) {
+    boolean keyFound = false;
+    while (!keyFound) {
       try {
-        if (prevMatchEndIndex != 0) {
-          stdoutString = new StringBuilder(
-              stdoutString.substring(prevMatchEndIndex, 
stdoutString.length()));
-          prevMatchEndIndex = 0;
-          currentMatcher = PATTERN_MATCHER.matcher(stdoutString);
+        Optional<Integer> keyLength = getNextNumberInStream();
+        if (!keyLength.isPresent()) {
+          return getTransformedValue(currentKey);
         }
-        int numberOfCharsRead = processOutput.read(charBuffer);
-        if (numberOfCharsRead < 0) {
-          if (currentKey != null) {
-            currentKey.setValue(stdoutString.substring(0,
-                Math.max(stdoutString.length() - 1, 0)));
-            return getTransformedValue(currentKey);
+        String keyStr = readNextNumberOfCharsFromStream(keyLength.get());
+        Matcher matcher = PATTERN_MATCHER.matcher(keyStr);
+        if (keyStr.length() == keyLength.get() && matcher.find()) {
+          Optional<Integer> valueLength = getNextNumberInStream();
+          if (valueLength.isPresent()) {
+            String valueStr = readNextNumberOfCharsFromStream(
+                valueLength.get());
+            if (valueStr.length() == valueLength.get()) {
+              keyFound = true;
+              nextKey = new KeyValue(matcher.group(PATTERN_KEY_GROUP_NUMBER),
+                  matcher.group(PATTERN_SEQ_GROUP_NUMBER),
+                  matcher.group(PATTERN_TYPE_GROUP_NUMBER),
+                  valueStr);
+            }
           }
-          throw new NoSuchElementException("No more elements found");
         }
-        stdoutString.append(charBuffer, 0, numberOfCharsRead);
-        currentMatcher.reset();
       } catch (IOException e) {
         throw new RuntimeIOException(e);
       }
     }
-    if (currentKey != null) {
-      currentKey.setValue(stdoutString.substring(prevMatchEndIndex,
-          currentMatcher.start() - 1));
-    }
-    prevMatchEndIndex = currentMatcher.end();
-    nextKey = new KeyValue(currentMatcher.group(PATTERN_KEY_GROUP_NUMBER),
-        currentMatcher.group(PATTERN_SEQ_GROUP_NUMBER),
-        currentMatcher.group(PATTERN_TYPE_GROUP_NUMBER));
     return getTransformedValue(currentKey);
   }
 
@@ -215,13 +250,11 @@ public abstract class ManagedSSTDumpIterator<T> 
implements ClosableIterator<T> {
 
     private String value;
 
-    private KeyValue(String key, String sequence, String type) {
+    private KeyValue(String key, String sequence, String type,
+                     String value) {
       this.key = key;
       this.sequence = Integer.valueOf(sequence);
       this.type = Integer.valueOf(type);
-    }
-
-    private void setValue(String value) {
       this.value = value;
     }
 
diff --git 
a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpTool.java
 
b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpTool.java
index 940438a457..649378fad6 100644
--- 
a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpTool.java
+++ 
b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpTool.java
@@ -35,35 +35,36 @@ public class ManagedSSTDumpTool {
 
   static {
     NativeLibraryLoader.getInstance()
-            .loadLibrary(ROCKS_TOOLS_NATIVE_LIBRARY_NAME);
+        .loadLibrary(ROCKS_TOOLS_NATIVE_LIBRARY_NAME);
   }
+
   private int bufferCapacity;
   private ExecutorService executorService;
 
   public ManagedSSTDumpTool(ExecutorService executorService,
                             int bufferCapacity)
-          throws NativeLibraryNotLoadedException {
+      throws NativeLibraryNotLoadedException {
     if (!NativeLibraryLoader.isLibraryLoaded(ROCKS_TOOLS_NATIVE_LIBRARY_NAME)) 
{
       throw new NativeLibraryNotLoadedException(
-              ROCKS_TOOLS_NATIVE_LIBRARY_NAME);
+          ROCKS_TOOLS_NATIVE_LIBRARY_NAME);
     }
     this.bufferCapacity = bufferCapacity;
     this.executorService = executorService;
   }
 
   public SSTDumpToolTask run(String[] args, ManagedOptions options)
-          throws NativeLibraryNotLoadedException {
+      throws NativeLibraryNotLoadedException {
     PipeInputStream pipeInputStream = new PipeInputStream(bufferCapacity);
     return new SSTDumpToolTask(this.executorService.submit(() ->
-            this.runInternal(args, options.getNativeHandle(),
+        this.runInternal(args, options.getNativeHandle(),
             pipeInputStream.getNativeHandle())), pipeInputStream);
   }
 
   public SSTDumpToolTask run(Map<String, String> args, ManagedOptions options)
-          throws NativeLibraryNotLoadedException {
+      throws NativeLibraryNotLoadedException {
     return this.run(args.entrySet().stream().map(e -> "--"
-            + (e.getValue() == null || e.getValue().isEmpty() ? e.getKey() :
-            e.getKey() + "=" + e.getValue())).toArray(String[]::new), options);
+        + (e.getValue() == null || e.getValue().isEmpty() ? e.getKey() :
+        e.getKey() + "=" + e.getValue())).toArray(String[]::new), options);
   }
 
   private native int runInternal(String[] args, long optionsHandle,
diff --git a/hadoop-hdds/rocks-native/src/main/patches/rocks-native.patch 
b/hadoop-hdds/rocks-native/src/main/patches/rocks-native.patch
index eb4501a77a..559d21495a 100644
--- a/hadoop-hdds/rocks-native/src/main/patches/rocks-native.patch
+++ b/hadoop-hdds/rocks-native/src/main/patches/rocks-native.patch
@@ -31,7 +31,7 @@ index 9261ba47d..09ed123e5 100644
  
  }  // namespace ROCKSDB_NAMESPACE
 diff --git a/table/sst_file_dumper.cc b/table/sst_file_dumper.cc
-index eefbaaeee..3f2b42324 100644
+index eefbaaeee..503ec7aa6 100644
 --- a/table/sst_file_dumper.cc
 +++ b/table/sst_file_dumper.cc
 @@ -45,7 +45,7 @@ SstFileDumper::SstFileDumper(const Options& options,
@@ -153,30 +153,36 @@ index eefbaaeee..3f2b42324 100644
    }
  
    return Status::OK();
-@@ -478,7 +481,7 @@ Status SstFileDumper::ReadSequential(bool print_kv, 
uint64_t read_num,
+@@ -478,21 +481,26 @@ Status SstFileDumper::ReadSequential(bool print_kv, 
uint64_t read_num,
  
      if (print_kv) {
        if (!decode_blob_index_ || ikey.type != kTypeBlobIndex) {
 -        fprintf(stdout, "%s => %s\n",
-+        fprintf(out_, "%s => %s\n",
++        fprintf(out_, "%lu %s => %lu %s\n",
++                ikey.DebugString(true, output_hex_).length(),
                  ikey.DebugString(true, output_hex_).c_str(),
++                value.ToString(output_hex_).length(),
                  value.ToString(output_hex_).c_str());
        } else {
-@@ -486,12 +489,12 @@ Status SstFileDumper::ReadSequential(bool print_kv, 
uint64_t read_num,
+         BlobIndex blob_index;
  
          const Status s = blob_index.DecodeFrom(value);
          if (!s.ok()) {
 -          fprintf(stderr, "%s => error decoding blob index\n",
-+          fprintf(err_, "%s => error decoding blob index\n",
++          fprintf(err_, "%lu %s => 25 error decoding blob index\n",
++                  ikey.DebugString(true, output_hex_).length(),
                    ikey.DebugString(true, output_hex_).c_str());
            continue;
          }
  
 -        fprintf(stdout, "%s => %s\n",
-+        fprintf(out_, "%s => %s\n",
++        fprintf(out_, "%lu %s => %lu %s\n",
++                ikey.DebugString(true, output_hex_).length(),
                  ikey.DebugString(true, output_hex_).c_str(),
++                blob_index.DebugString(output_hex_).length(),
                  blob_index.DebugString(output_hex_).c_str());
        }
+     }
 diff --git a/table/sst_file_dumper.h b/table/sst_file_dumper.h
 index 7be876390..ea07154da 100644
 --- a/table/sst_file_dumper.h


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to