This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f745fcc96 HDDS-8137. [Snapshot] SnapDiff to use tombstone entries in 
SST files (#4376)
8f745fcc96 is described below

commit 8f745fcc961fc72b88005cbf669cfd503f8d3d93
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Tue Apr 18 13:19:22 2023 -0700

    HDDS-8137. [Snapshot] SnapDiff to use tombstone entries in SST files (#4376)
---
 .../common/src/main/resources/ozone-default.xml    |  18 ++
 .../utils/db/managed/ManagedSSTDumpIterator.java   | 105 ++++++-----
 hadoop-hdds/rocksdb-checkpoint-differ/pom.xml      |   4 +
 .../ozone/rocksdb/util/ManagedSstFileReader.java   | 186 ++++++++++++++++----
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |  11 ++
 hadoop-ozone/ozone-manager/pom.xml                 |   4 +
 .../ozone/om/snapshot/SnapshotDiffManager.java     | 193 ++++++++++++++-------
 7 files changed, 386 insertions(+), 135 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 018ff80667..f8f908b909 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3656,4 +3656,22 @@
       without using the optimised DAG based pruning approach
     </description>
   </property>
+
+  <property>
+    <name>ozone.om.snapshot.sst_dumptool.pool.size</name>
+    <value>1</value>
+    <tag>OZONE, OM</tag>
+    <description>
+      Threadpool size for SST Dumptool which would be used for computing 
snapdiff when native library is enabled.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.om.snapshot.sst_dumptool.buffer.size</name>
+    <value>8KB</value>
+    <tag>OZONE, OM</tag>
+    <description>
+      Buffer size for SST Dumptool Pipe which would be used for computing 
snapdiff when native library is enabled.
+    </description>
+  </property>
 </configuration>
diff --git 
a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java
 
b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java
index 35aaeb33b0..a083a2b988 100644
--- 
a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java
+++ 
b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java
@@ -17,34 +17,39 @@
 
 package org.apache.hadoop.hdds.utils.db.managed;
 
+import org.apache.hadoop.util.ClosableIterator;
 import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException;
 import org.eclipse.jetty.io.RuntimeIOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.UncheckedIOException;
 import java.nio.charset.StandardCharsets;
-import java.util.Iterator;
+import java.util.Arrays;
+import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 /**
  * Iterator to Parse output of RocksDBSSTDumpTool.
  */
-public class ManagedSSTDumpIterator implements
-        Iterator<ManagedSSTDumpIterator.KeyValue>, AutoCloseable {
-  private static final String SST_DUMP_TOOL_CLASS =
-          "org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpTool";
+public abstract class ManagedSSTDumpIterator<T> implements ClosableIterator<T> 
{
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ManagedSSTDumpIterator.class);
   private static final String PATTERN_REGEX =
-          "'([^=>]+)' seq:([0-9]+), type:([0-9]+) => ";
+      "'([^=>]+)' seq:([0-9]+), type:([0-9]+) => ";
 
   public static final int PATTERN_KEY_GROUP_NUMBER = 1;
   public static final int PATTERN_SEQ_GROUP_NUMBER = 2;
   public static final int PATTERN_TYPE_GROUP_NUMBER = 3;
-  private static final Pattern PATTERN_MATCHER =
-          Pattern.compile(PATTERN_REGEX);
+  private static final Pattern PATTERN_MATCHER = 
Pattern.compile(PATTERN_REGEX);
   private BufferedReader processOutput;
   private StringBuilder stdoutString;
 
@@ -56,32 +61,33 @@ public class ManagedSSTDumpIterator implements
 
   private ManagedSSTDumpTool.SSTDumpToolTask sstDumpToolTask;
   private AtomicBoolean open;
+  private StackTraceElement[] stackTrace;
 
 
   public ManagedSSTDumpIterator(ManagedSSTDumpTool sstDumpTool,
-                                String sstFilePath,
-                                ManagedOptions options) throws IOException,
-          NativeLibraryNotLoadedException {
+                                String sstFilePath, ManagedOptions options)
+      throws IOException, NativeLibraryNotLoadedException {
     File sstFile = new File(sstFilePath);
     if (!sstFile.exists()) {
       throw new IOException(String.format("File in path : %s doesn't exist",
-              sstFile.getAbsolutePath()));
+          sstFile.getAbsolutePath()));
     }
     if (!sstFile.isFile()) {
       throw new IOException(String.format("Path given: %s is not a file",
-              sstFile.getAbsolutePath()));
+          sstFile.getAbsolutePath()));
     }
     init(sstDumpTool, sstFile, options);
+    this.stackTrace = Thread.currentThread().getStackTrace();
   }
 
   private void init(ManagedSSTDumpTool sstDumpTool, File sstFile,
                     ManagedOptions options)
-          throws NativeLibraryNotLoadedException {
-    String[] args = {"--file=" + sstFile.getAbsolutePath(),
-                     "--command=scan"};
+      throws NativeLibraryNotLoadedException {
+    String[] args = {"--file=" + sstFile.getAbsolutePath(), "--command=scan"};
     this.sstDumpToolTask = sstDumpTool.run(args, options);
-    processOutput = new BufferedReader(new InputStreamReader(
-            sstDumpToolTask.getPipedOutput(), StandardCharsets.UTF_8));
+    processOutput = new BufferedReader(
+        new InputStreamReader(sstDumpToolTask.getPipedOutput(),
+            StandardCharsets.UTF_8));
     stdoutString = new StringBuilder();
     currentMatcher = PATTERN_MATCHER.matcher(stdoutString);
     charBuffer = new char[8192];
@@ -97,15 +103,16 @@ public class ManagedSSTDumpIterator implements
     if (!this.open.get()) {
       throw new RuntimeException("Iterator has been closed");
     }
-    if (sstDumpToolTask.getFuture().isDone()
-            && sstDumpToolTask.exitValue() != 0) {
+    if (sstDumpToolTask.getFuture().isDone() &&
+        sstDumpToolTask.exitValue() != 0) {
       throw new RuntimeException("Process Terminated with non zero " +
-              String.format("exit value %d", sstDumpToolTask.exitValue()));
+          String.format("exit value %d", sstDumpToolTask.exitValue()));
     }
   }
 
   /**
    * Checks the status of the process & sees if there is another record.
+   *
    * @return True if next exists & false otherwise
    * Throws Runtime Exception in case of SST File read failure
    */
@@ -116,21 +123,30 @@ public class ManagedSSTDumpIterator implements
     return nextKey != null;
   }
 
+  /**
+   * Transforms Key to a certain value.
+   *
+   * @param value
+   * @return transformed Value
+   */
+  protected abstract T getTransformedValue(KeyValue value);
+
   /**
    * Returns the next record from SSTDumpTool.
+   *
    * @return next Key
    * Throws Runtime Exception incase of failure.
    */
   @Override
-  public KeyValue next() {
+  public T next() {
     checkSanityOfProcess();
     currentKey = nextKey;
     nextKey = null;
     while (!currentMatcher.find()) {
       try {
         if (prevMatchEndIndex != 0) {
-          stdoutString = new StringBuilder(stdoutString.substring(
-                  prevMatchEndIndex, stdoutString.length()));
+          stdoutString = new StringBuilder(
+              stdoutString.substring(prevMatchEndIndex, 
stdoutString.length()));
           prevMatchEndIndex = 0;
           currentMatcher = PATTERN_MATCHER.matcher(stdoutString);
         }
@@ -138,9 +154,10 @@ public class ManagedSSTDumpIterator implements
         if (numberOfCharsRead < 0) {
           if (currentKey != null) {
             currentKey.setValue(stdoutString.substring(0,
-                    Math.max(stdoutString.length() - 1, 0)));
+                Math.max(stdoutString.length() - 1, 0)));
+            return getTransformedValue(currentKey);
           }
-          return currentKey;
+          throw new NoSuchElementException("No more elements found");
         }
         stdoutString.append(charBuffer, 0, numberOfCharsRead);
         currentMatcher.reset();
@@ -150,30 +167,42 @@ public class ManagedSSTDumpIterator implements
     }
     if (currentKey != null) {
       currentKey.setValue(stdoutString.substring(prevMatchEndIndex,
-              currentMatcher.start() - 1));
+          currentMatcher.start() - 1));
     }
     prevMatchEndIndex = currentMatcher.end();
-    nextKey = new KeyValue(
-            currentMatcher.group(PATTERN_KEY_GROUP_NUMBER),
-            currentMatcher.group(PATTERN_SEQ_GROUP_NUMBER),
-            currentMatcher.group(PATTERN_TYPE_GROUP_NUMBER));
-    return currentKey;
+    nextKey = new KeyValue(currentMatcher.group(PATTERN_KEY_GROUP_NUMBER),
+        currentMatcher.group(PATTERN_SEQ_GROUP_NUMBER),
+        currentMatcher.group(PATTERN_TYPE_GROUP_NUMBER));
+    return getTransformedValue(currentKey);
   }
 
   @Override
-  public synchronized void close() throws Exception {
+  public synchronized void close() throws UncheckedIOException {
     if (this.sstDumpToolTask != null) {
       if (!this.sstDumpToolTask.getFuture().isDone()) {
         this.sstDumpToolTask.getFuture().cancel(true);
       }
-      this.processOutput.close();
+      try {
+        this.processOutput.close();
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
     }
     open.compareAndSet(true, false);
   }
 
   @Override
   protected void finalize() throws Throwable {
+    if (open.get()) {
+      LOG.warn("{}  is not closed properly." +
+              " StackTrace for unclosed instance: {}",
+          this.getClass().getName(),
+          Arrays.stream(stackTrace)
+              .map(StackTraceElement::toString).collect(
+                  Collectors.joining("\n")));
+    }
     this.close();
+    super.finalize();
   }
 
   /**
@@ -214,12 +243,8 @@ public class ManagedSSTDumpIterator implements
 
     @Override
     public String toString() {
-      return "KeyValue{" +
-              "key='" + key + '\'' +
-              ", sequence=" + sequence +
-              ", type=" + type +
-              ", value='" + value + '\'' +
-              '}';
+      return "KeyValue{" + "key='" + key + '\'' + ", sequence=" + sequence +
+          ", type=" + type + ", value='" + value + '\'' + '}';
     }
   }
 }
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml 
b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
index f2a932b40a..3c3764d386 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
@@ -73,6 +73,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd";>
       <artifactId>junit-jupiter-params</artifactId>
       <scope>test</scope>
     </dependency>
+      <dependency>
+          <groupId>org.apache.ozone</groupId>
+          <artifactId>hdds-rocks-native</artifactId>
+      </dependency>
   </dependencies>
 
   <build>
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/ManagedSstFileReader.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/ManagedSstFileReader.java
index cf8e59331d..ce05cd9715 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/ManagedSstFileReader.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/ManagedSstFileReader.java
@@ -18,13 +18,19 @@
 
 package org.apache.ozone.rocksdb.util;
 
-import org.rocksdb.Options;
+import org.apache.hadoop.util.ClosableIterator;
+import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpIterator;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpTool;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.SstFileReader;
 import org.rocksdb.SstFileReaderIterator;
 
-import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
@@ -46,42 +52,153 @@ public class ManagedSstFileReader {
   public ManagedSstFileReader(final Collection<String> sstFiles) {
     this.sstFiles = sstFiles;
   }
-  public Stream<String> getKeyStream() throws RocksDBException {
-    final ManagedSstFileIterator itr = new ManagedSstFileIterator(sstFiles);
-    final Spliterator<String> spliterator = Spliterators
-        .spliteratorUnknownSize(itr, 0);
-    return StreamSupport.stream(spliterator, false).onClose(itr::close);
+
+  public static <T> Stream<T> getStreamFromIterator(ClosableIterator<T> itr) {
+    final Spliterator<T> spliterator =
+        Spliterators.spliteratorUnknownSize(itr, 0);
+    return StreamSupport.stream(spliterator, false).onClose(() -> {
+      itr.close();
+    });
+  }
+
+  public Stream<String> getKeyStream() throws RocksDBException,
+      NativeLibraryNotLoadedException, IOException {
+    // TODO: [SNAPSHOT] Check if default Options and ReadOptions is enough.
+    final MultipleSstFileIterator<String> itr =
+        new MultipleSstFileIterator<String>(sstFiles) {
+          private ManagedOptions options;
+          private ReadOptions readOptions;
+
+          @Override
+          protected void init() {
+            this.options = new ManagedOptions();
+            this.readOptions = new ManagedReadOptions();
+          }
+
+          @Override
+          protected ClosableIterator<String> getKeyIteratorForFile(String file)
+              throws RocksDBException {
+            return new ManagedSstFileIterator(file, options, readOptions) {
+              @Override
+              protected String getIteratorValue(
+                  SstFileReaderIterator iterator) {
+                return new String(iterator.key(), UTF_8);
+              }
+            };
+          }
+
+          @Override
+          public void close() throws UncheckedIOException {
+            super.close();
+            options.close();
+            readOptions.close();
+          }
+        };
+    return getStreamFromIterator(itr);
+  }
+
+  public Stream<String> getKeyStreamWithTombstone(
+      ManagedSSTDumpTool sstDumpTool) throws IOException, RocksDBException,
+      NativeLibraryNotLoadedException {
+    final MultipleSstFileIterator<String> itr =
+        new MultipleSstFileIterator<String>(sstFiles) {
+          //TODO: [SNAPSHOT] Check if default Options is enough.
+          private ManagedOptions options;
+
+          @Override
+          protected void init() {
+            this.options = new ManagedOptions();
+          }
+
+          @Override
+          protected ClosableIterator<String> getKeyIteratorForFile(String file)
+              throws NativeLibraryNotLoadedException, IOException {
+            return new ManagedSSTDumpIterator<String>(sstDumpTool, file,
+                options) {
+              @Override
+              protected String getTransformedValue(KeyValue value) {
+                return value.getKey();
+              }
+            };
+          }
+
+          @Override
+          public void close() throws UncheckedIOException {
+            super.close();
+            options.close();
+          }
+        };
+    return getStreamFromIterator(itr);
+  }
+
+  private abstract static class ManagedSstFileIterator implements
+      ClosableIterator<String> {
+    private SstFileReader fileReader;
+    private SstFileReaderIterator fileReaderIterator;
+
+    ManagedSstFileIterator(String path, ManagedOptions options,
+                           ReadOptions readOptions)
+        throws RocksDBException {
+      this.fileReader = new SstFileReader(options);
+      this.fileReader.open(path);
+      this.fileReaderIterator = fileReader.newIterator(readOptions);
+      fileReaderIterator.seekToFirst();
+    }
+
+    @Override
+    public void close() {
+      this.fileReaderIterator.close();
+      this.fileReader.close();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return fileReaderIterator.isValid();
+    }
+
+    protected abstract String getIteratorValue(SstFileReaderIterator iterator);
+
+    @Override
+    public String next() {
+      String value = getIteratorValue(fileReaderIterator);
+      fileReaderIterator.next();
+      return value;
+    }
   }
 
-  private static final class ManagedSstFileIterator implements
-      Iterator<String>, Closeable {
+  private abstract static class MultipleSstFileIterator<T> implements
+      ClosableIterator<T> {
 
     private final Iterator<String> fileNameIterator;
-    private final Options options;
-    private final ReadOptions readOptions;
+
     private String currentFile;
-    private SstFileReader currentFileReader;
-    private SstFileReaderIterator currentFileIterator;
+    private ClosableIterator<T> currentFileIterator;
 
-    private ManagedSstFileIterator(Collection<String> files)
-        throws RocksDBException {
-      // TODO: Check if default Options and ReadOptions is enough.
-      this.options = new Options();
-      this.readOptions = new ReadOptions();
+    private MultipleSstFileIterator(Collection<String> files)
+        throws IOException, RocksDBException,
+        NativeLibraryNotLoadedException {
       this.fileNameIterator = files.iterator();
+      init();
       moveToNextFile();
     }
 
+    protected abstract void init();
+
+    protected abstract ClosableIterator<T> getKeyIteratorForFile(String file)
+        throws RocksDBException, NativeLibraryNotLoadedException,
+        IOException;
+
     @Override
     public boolean hasNext() {
       try {
         do {
-          if (currentFileIterator.isValid()) {
+          if (currentFileIterator.hasNext()) {
             return true;
           }
         } while (moveToNextFile());
-      } catch (RocksDBException e) {
-        // TODO: This exception has to be handled by the caller.
+      } catch (IOException | RocksDBException |
+               NativeLibraryNotLoadedException e) {
+        // TODO: [Snapshot] This exception has to be handled by the caller.
         //  We have to do better exception handling.
         throw new RuntimeException(e);
       }
@@ -89,37 +206,36 @@ public class ManagedSstFileReader {
     }
 
     @Override
-    public String next() {
+    public T next() {
       if (hasNext()) {
-        final String value = new String(currentFileIterator.key(), UTF_8);
-        currentFileIterator.next();
-        return value;
+        return currentFileIterator.next();
       }
-      throw new NoSuchElementException("No more keys");
+      throw new NoSuchElementException("No more elements found.");
     }
 
     @Override
-    public void close() {
-      closeCurrentFile();
+    public void close() throws UncheckedIOException {
+      try {
+        closeCurrentFile();
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
     }
 
-    private boolean moveToNextFile() throws RocksDBException {
+    private boolean moveToNextFile() throws IOException, RocksDBException,
+        NativeLibraryNotLoadedException {
       if (fileNameIterator.hasNext()) {
         closeCurrentFile();
         currentFile = fileNameIterator.next();
-        currentFileReader = new SstFileReader(options);
-        currentFileReader.open(currentFile);
-        currentFileIterator = currentFileReader.newIterator(readOptions);
-        currentFileIterator.seekToFirst();
+        this.currentFileIterator = getKeyIteratorForFile(currentFile);
         return true;
       }
       return false;
     }
 
-    private void closeCurrentFile() {
+    private void closeCurrentFile() throws IOException {
       if (currentFile != null) {
         currentFileIterator.close();
-        currentFileReader.close();
         currentFile = null;
       }
     }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 937835fdb7..34e17519e0 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -28,6 +28,17 @@ import org.apache.ratis.util.TimeDuration;
  * Ozone Manager Constants.
  */
 public final class OMConfigKeys {
+  public static final String
+      OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE =
+      "ozone.om.snapshot.sst_dumptool.pool.size";
+  public static final int
+      OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE_DEFAULT = 1;
+  public static final String
+      OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE =
+      "ozone.om.snapshot.sst_dumptool.buffer.size";
+  public static final String
+      OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE_DEFAULT = "8KB";
+
   /**
    * Never constructed.
    */
diff --git a/hadoop-ozone/ozone-manager/pom.xml 
b/hadoop-ozone/ozone-manager/pom.xml
index 974565206d..a2b8b284d0 100644
--- a/hadoop-ozone/ozone-manager/pom.xml
+++ b/hadoop-ozone/ozone-manager/pom.xml
@@ -254,6 +254,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd";>
       <groupId>org.junit.jupiter</groupId>
       <artifactId>junit-jupiter-params</artifactId>
     </dependency>
+      <dependency>
+          <groupId>org.apache.ozone</groupId>
+          <artifactId>hdds-rocks-native</artifactId>
+      </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
index 60e7ea7fd0..09a828e33d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
@@ -30,6 +30,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.utils.NativeConstants;
+import org.apache.hadoop.hdds.utils.NativeLibraryLoader;
+import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -41,9 +49,11 @@ import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.utils.db.CodecRegistry;
 import org.apache.hadoop.hdds.utils.db.IntegerCodec;
 import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpTool;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OmSnapshot;
@@ -73,6 +83,8 @@ import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.SynchronousQueue;
+
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getSnapshotInfo;
 import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.DONE;
@@ -124,6 +136,9 @@ public class SnapshotDiffManager implements AutoCloseable {
   private final PersistentMap<String, SnapshotDiffJob> snapDiffJobTable;
   private final ExecutorService executorService;
 
+  private boolean isNativeRocksToolsLoaded;
+
+  private ManagedSSTDumpTool sstDumpTool;
 
   public SnapshotDiffManager(ManagedRocksDB db,
                              RocksDBCheckpointDiffer differ,
@@ -170,6 +185,36 @@ public class SnapshotDiffManager implements AutoCloseable {
     // TODO: [SNAPSHOT] Load jobs only if it is leader node.
     //  It could a event-triggered form OM when node is leader and up.
     this.loadJobsOnStartUp();
+    isNativeRocksToolsLoaded = NativeLibraryLoader.getInstance()
+            .loadLibrary(NativeConstants.ROCKS_TOOLS_NATIVE_LIBRARY_NAME);
+    if (isNativeRocksToolsLoaded) {
+      isNativeRocksToolsLoaded = initSSTDumpTool(
+          ozoneManager.getConfiguration());
+    }
+  }
+
+  private boolean initSSTDumpTool(OzoneConfiguration conf) {
+    try {
+      int threadPoolSize = conf.getInt(
+              OMConfigKeys.OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE,
+              OMConfigKeys
+                  .OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE_DEFAULT);
+      int bufferSize = (int) conf.getStorageSize(
+          OMConfigKeys.OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE,
+          OMConfigKeys
+              .OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE_DEFAULT,
+              StorageUnit.BYTES);
+      ExecutorService execService = new ThreadPoolExecutor(0,
+              threadPoolSize, 60, TimeUnit.SECONDS,
+              new SynchronousQueue<>(), new ThreadFactoryBuilder()
+              .setNameFormat("snapshot-diff-manager-sst-dump-tool-TID-%d")
+              .build(),
+              new ThreadPoolExecutor.DiscardPolicy());
+      sstDumpTool = new ManagedSSTDumpTool(execService, bufferSize);
+    } catch (NativeLibraryNotLoadedException e) {
+      return false;
+    }
+    return true;
   }
 
   private Map<String, String> getTablePrefixes(
@@ -209,6 +254,13 @@ public class SnapshotDiffManager implements AutoCloseable {
         getTablePrefixes(snapshotOMMM, volumeName, bucketName));
   }
 
+  private Set<String> getSSTFileListForSnapshot(OmSnapshot snapshot,
+          List<String> tablesToLookUp) throws RocksDBException {
+    return RdbUtil.getSSTFilesForComparison(snapshot
+        .getMetadataManager().getStore().getDbLocation()
+        .getPath(), tablesToLookUp);
+  }
+
   @SuppressWarnings("parameternumber")
   public SnapshotDiffResponse getSnapshotDiffReport(
       final String volume,
@@ -408,7 +460,6 @@ public class SnapshotDiffManager implements AutoCloseable {
     ColumnFamilyHandle fromSnapshotColumnFamily = null;
     ColumnFamilyHandle toSnapshotColumnFamily = null;
     ColumnFamilyHandle objectIDsColumnFamily = null;
-
     try {
       // JobId is prepended to column families name to make them unique
       // for request.
@@ -428,30 +479,19 @@ public class SnapshotDiffManager implements AutoCloseable 
{
       // Note: Store objectId and keyName as byte array to reduce unnecessary
       // serialization and deserialization.
       final PersistentMap<byte[], byte[]> objectIdToKeyNameMapForFromSnapshot =
-          new RocksDbPersistentMap<>(db,
-              fromSnapshotColumnFamily,
-              codecRegistry,
-              byte[].class,
-              byte[].class);
-
+          new RocksDbPersistentMap<>(db, fromSnapshotColumnFamily,
+              codecRegistry, byte[].class, byte[].class);
       // ObjectId to keyName map to keep key info for toSnapshot.
       final PersistentMap<byte[], byte[]> objectIdToKeyNameMapForToSnapshot =
-          new RocksDbPersistentMap<>(db,
-              toSnapshotColumnFamily,
-              codecRegistry,
-              byte[].class,
-              byte[].class);
-
+          new RocksDbPersistentMap<>(db, toSnapshotColumnFamily, codecRegistry,
+              byte[].class, byte[].class);
       // Set of unique objectId between fromSnapshot and toSnapshot.
       final PersistentSet<byte[]> objectIDsToCheckMap =
-          new RocksDbPersistentSet<>(db,
-              objectIDsColumnFamily,
-              codecRegistry,
+          new RocksDbPersistentSet<>(db, objectIDsColumnFamily, codecRegistry,
               byte[].class);
 
       final BucketLayout bucketLayout = getBucketLayout(volume, bucket,
           fromSnapshot.getMetadataManager());
-
       final Table<String, OmKeyInfo> fsKeyTable =
           fromSnapshot.getMetadataManager().getKeyTable(bucketLayout);
       final Table<String, OmKeyInfo> tsKeyTable =
@@ -466,40 +506,82 @@ public class SnapshotDiffManager implements AutoCloseable 
{
 
       Map<String, String> tablePrefixes =
           getTablePrefixes(toSnapshot.getMetadataManager(), volume, bucket);
-
-      final Set<String> deltaFilesForKeyOrFileTable =
-          getDeltaFiles(fromSnapshot, toSnapshot,
-              Collections.singletonList(fsKeyTable.getName()), fsInfo, tsInfo,
+      List<String> tablesToLookUp =
+              Collections.singletonList(fsKeyTable.getName());
+      final Set<String> deltaFilesForKeyOrFileTable = getDeltaFiles(
+              fromSnapshot, toSnapshot, tablesToLookUp, fsInfo, tsInfo,
               useFullDiff, tablePrefixes);
 
-      addToObjectIdMap(fsKeyTable,
-          tsKeyTable,
-          deltaFilesForKeyOrFileTable,
-          objectIdToKeyNameMapForFromSnapshot,
-          objectIdToKeyNameMapForToSnapshot,
-          objectIDsToCheckMap,
-          tablePrefixes);
+      // Workaround to handle deletes if native rockstools for reading
+      // tombstone is not loaded.
+      // TODO: [SNAPSHOT] Update Rocksdb SSTFileIterator to read tombstone
+      if (!isNativeRocksToolsLoaded) {
+        deltaFilesForKeyOrFileTable.addAll(getSSTFileListForSnapshot(
+                fromSnapshot, tablesToLookUp));
+      }
+      try {
+        addToObjectIdMap(fsKeyTable, tsKeyTable,
+            Pair.of(isNativeRocksToolsLoaded, deltaFilesForKeyOrFileTable),
+            objectIdToKeyNameMapForFromSnapshot,
+            objectIdToKeyNameMapForToSnapshot, objectIDsToCheckMap,
+            tablePrefixes);
+      } catch (NativeLibraryNotLoadedException e) {
+        // Workaround to handle deletes if use of native rockstools for reading
+        // tombstone fails.
+        // TODO: [SNAPSHOT] Update Rocksdb SSTFileIterator to read tombstone
+        deltaFilesForKeyOrFileTable.addAll(getSSTFileListForSnapshot(
+                fromSnapshot, tablesToLookUp));
+        try {
+          addToObjectIdMap(fsKeyTable, tsKeyTable,
+              Pair.of(false, deltaFilesForKeyOrFileTable),
+              objectIdToKeyNameMapForFromSnapshot,
+              objectIdToKeyNameMapForToSnapshot, objectIDsToCheckMap,
+              tablePrefixes);
+        } catch (NativeLibraryNotLoadedException ex) {
+          // This code should be never executed.
+          throw new IllegalStateException(ex);
+        }
+      }
 
       if (bucketLayout.isFileSystemOptimized()) {
         final Table<String, OmDirectoryInfo> fsDirTable =
             fromSnapshot.getMetadataManager().getDirectoryTable();
         final Table<String, OmDirectoryInfo> tsDirTable =
             toSnapshot.getMetadataManager().getDirectoryTable();
+        tablesToLookUp = Collections.singletonList(fsDirTable.getName());
         final Set<String> deltaFilesForDirTable =
-            getDeltaFiles(fromSnapshot, toSnapshot,
-                Collections.singletonList(fsDirTable.getName()), fsInfo, 
tsInfo,
-                useFullDiff, tablePrefixes);
-        addToObjectIdMap(fsDirTable,
-            tsDirTable,
-            deltaFilesForDirTable,
-            objectIdToKeyNameMapForFromSnapshot,
-            objectIdToKeyNameMapForToSnapshot,
-            objectIDsToCheckMap,
-            tablePrefixes);
+            getDeltaFiles(fromSnapshot, toSnapshot, tablesToLookUp, fsInfo,
+                    tsInfo, useFullDiff, tablePrefixes);
+        if (!isNativeRocksToolsLoaded) {
+          deltaFilesForDirTable.addAll(getSSTFileListForSnapshot(
+                  fromSnapshot, tablesToLookUp));
+        }
+        try {
+          addToObjectIdMap(fsDirTable, tsDirTable,
+              Pair.of(isNativeRocksToolsLoaded, deltaFilesForDirTable),
+              objectIdToKeyNameMapForFromSnapshot,
+              objectIdToKeyNameMapForToSnapshot,
+              objectIDsToCheckMap,
+              tablePrefixes);
+        } catch (NativeLibraryNotLoadedException e) {
+          try {
+            // Workaround to handle deletes if use of native rockstools for
+            // reading tombstone fails.
+            // TODO: [SNAPSHOT] Update Rocksdb SSTFileIterator to read 
tombstone
+            deltaFilesForDirTable.addAll(getSSTFileListForSnapshot(
+                    fromSnapshot, tablesToLookUp));
+            addToObjectIdMap(fsDirTable, tsDirTable,
+                Pair.of(false, deltaFilesForDirTable),
+                objectIdToKeyNameMapForFromSnapshot,
+                objectIdToKeyNameMapForToSnapshot, objectIDsToCheckMap,
+                tablePrefixes);
+          } catch (NativeLibraryNotLoadedException ex) {
+            // This code should be never executed.
+            throw new IllegalStateException(ex);
+          }
+        }
       }
-
-      generateDiffReport(jobId,
-          objectIDsToCheckMap,
+      generateDiffReport(jobId, objectIDsToCheckMap,
           objectIdToKeyNameMapForFromSnapshot,
           objectIdToKeyNameMapForToSnapshot);
       updateJobStatus(jobKey, IN_PROGRESS, DONE);
@@ -517,22 +599,26 @@ public class SnapshotDiffManager implements AutoCloseable 
{
 
   private void addToObjectIdMap(Table<String, ? extends WithObjectID> fsTable,
                                 Table<String, ? extends WithObjectID> tsTable,
-                                Set<String> deltaFiles,
+                                Pair<Boolean, Set<String>>
+                                    isNativeRocksToolsLoadedDeltaFilesPair,
                                 PersistentMap<byte[], byte[]> oldObjIdToKeyMap,
                                 PersistentMap<byte[], byte[]> newObjIdToKeyMap,
                                 PersistentSet<byte[]> objectIDsToCheck,
                                 Map<String, String> tablePrefixes)
-      throws IOException {
+      throws IOException, NativeLibraryNotLoadedException {
 
+    Set<String> deltaFiles = isNativeRocksToolsLoadedDeltaFilesPair.getRight();
     if (deltaFiles.isEmpty()) {
       return;
     }
-
+    boolean nativeRocksToolsLoaded =
+        isNativeRocksToolsLoadedDeltaFilesPair.getLeft();
     boolean isDirectoryTable =
         fsTable.getName().equals(OmMetadataManagerImpl.DIRECTORY_TABLE);
-
-    try (Stream<String> keysToCheck = new ManagedSstFileReader(deltaFiles)
-        .getKeyStream()) {
+    ManagedSstFileReader sstFileReader = new ManagedSstFileReader(deltaFiles);
+    try (Stream<String> keysToCheck = nativeRocksToolsLoaded
+                 ? sstFileReader.getKeyStreamWithTombstone(sstDumpTool)
+                 : sstFileReader.getKeyStream()) {
       keysToCheck.forEach(key -> {
         try {
           final WithObjectID oldKey = fsTable.get(key);
@@ -602,19 +688,6 @@ public class SnapshotDiffManager implements AutoCloseable {
       List<String> sstDiffList =
           differ.getSSTDiffListWithFullPath(toDSI, fromDSI);
       deltaFiles.addAll(sstDiffList);
-
-      // TODO: [SNAPSHOT] Remove the workaround below when the SnapDiff logic
-      //  can read tombstones in SST files.
-      // Workaround: Append "From DB" SST files to the deltaFiles list so that
-      //  the current SnapDiff logic correctly handles deleted keys.
-      if (!deltaFiles.isEmpty()) {
-        Set<String> fromSnapshotFiles = RdbUtil.getSSTFilesForComparison(
-            fromSnapshot.getMetadataManager()
-                .getStore().getDbLocation().getPath(),
-            tablesToLookUp);
-        deltaFiles.addAll(fromSnapshotFiles);
-      }
-      // End of Workaround
     }
 
     if (useFullDiff || deltaFiles.isEmpty()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to