smengcl commented on code in PR #4376:
URL: https://github.com/apache/ozone/pull/4376#discussion_r1139828081


##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/RdbUtil.java:
##########


Review Comment:
   Is `getSSTFilesForComparison()` here in this class filtering out SSTs with 
only default CF on line 59?
   
   Or is it just a dummy CF name here for the SST reader which doesn't care?
   
   It is called from `getSSTFileListForSnapshot()` thus the question.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java:
##########
@@ -131,6 +177,13 @@ private DifferSnapshotInfo getDSIFromSI(SnapshotInfo 
snapshotInfo,
         getTablePrefixes(snapshotOMMM, volumeName, bucketName));
   }
 
+  private Set<String> getSSTFileListForSnapshot(OmSnapshot snapshot,
+          List<String> tablesToLookUp) throws RocksDBException {
+    return RdbUtil.getSSTFilesForComparison(snapshot
+            .getMetadataManager().getStore().getDbLocation()
+                    .getPath(), tablesToLookUp);
+  }
+
   @SuppressWarnings("checkstyle:methodlength")
   public SnapshotDiffReport getSnapshotDiffReport(final String volume,

Review Comment:
   Need to have more test cases to cover the branch additions in this method.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/ManagedSstFileReader.java:
##########
@@ -46,41 +47,130 @@ public class ManagedSstFileReader {
   public ManagedSstFileReader(final Collection<String> sstFiles) {
     this.sstFiles = sstFiles;
   }
-  public Stream<String> getKeyStream() throws RocksDBException {
-    final ManagedSstFileIterator itr = new ManagedSstFileIterator(sstFiles);
-    final Spliterator<String> spliterator = Spliterators
-        .spliteratorUnknownSize(itr, 0);
-    return StreamSupport.stream(spliterator, false).onClose(itr::close);
+
+  public Stream<String> getKeyStream() throws RocksDBException,
+          NativeLibraryNotLoadedException, IOException {
+    //TODO: [SNAPSHOT] Check if default Options and ReadOptions is enough.
+    ManagedOptions options = new ManagedOptions();
+    ReadOptions readOptions = new ReadOptions();
+    final MultipleSstFileIterator<String> itr =
+            new MultipleSstFileIterator<String>(sstFiles) {
+          @Override
+          protected HddsUtils.CloseableIterator<String>
+              initNewKeyIteratorForFile(String file) throws RocksDBException {
+            return new ManagedSstFileIterator<String>(file, options,
+                    readOptions) {
+              @Override
+              protected String getIteratorValue(
+                      SstFileReaderIterator iterator) {
+                return new String(iterator.key(), UTF_8);
+              }
+            };
+          }
+
+          @Override
+          public void close() throws IOException {
+            super.close();
+            options.close();
+            readOptions.close();
+          }
+        };
+    return RdbUtil.getStreamFromIterator(itr);
+  }
+  public Stream<String> getKeyStreamWithTombstone(
+          ManagedSSTDumpTool sstDumpTool) throws IOException, RocksDBException,
+          NativeLibraryNotLoadedException {
+    //TODO: [SNAPSHOT] Check if default Options is enough.
+    ManagedOptions options = new ManagedOptions();
+    final MultipleSstFileIterator<String> itr =
+            new MultipleSstFileIterator<String>(sstFiles) {
+          @Override
+          protected HddsUtils.CloseableIterator<String>
+              initNewKeyIteratorForFile(String file)
+                  throws NativeLibraryNotLoadedException, IOException {
+            return new ManagedSSTDumpIterator<String>(sstDumpTool, file,
+                    options) {
+              @Override
+              protected String getTransformedValue(KeyValue value) {
+                return value.getKey();
+              }
+            };
+          }
+
+          @Override
+          public void close() throws IOException {
+            super.close();
+            options.close();
+          }
+        };
+    return RdbUtil.getStreamFromIterator(itr);
+  }
+
+  private abstract static class ManagedSstFileIterator<T> implements
+          HddsUtils.CloseableIterator<T> {
+    private SstFileReader fileReader;
+    private SstFileReaderIterator fileReaderIterator;
+
+    ManagedSstFileIterator(String path, ManagedOptions options,
+                           ReadOptions readOptions)
+            throws RocksDBException {
+      this.fileReader = new SstFileReader(options);
+      this.fileReader.open(path);
+      this.fileReaderIterator = fileReader.newIterator(readOptions);
+      fileReaderIterator.seekToFirst();
+    }
+
+    @Override
+    public void close() throws IOException {
+      this.fileReaderIterator.close();
+      this.fileReader.close();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return fileReaderIterator.isValid();
+    }
+
+    protected abstract T getIteratorValue(SstFileReaderIterator iterator);
+
+    @Override
+    public T next() {
+      T value = getIteratorValue(fileReaderIterator);
+      fileReaderIterator.next();
+      return value;
+    }
   }
 
-  private static final class ManagedSstFileIterator implements
-      Iterator<String>, Closeable {
+  private abstract static class MultipleSstFileIterator<T> implements
+          HddsUtils.CloseableIterator<T> {
 
     private final Iterator<String> fileNameIterator;
-    private final Options options;
-    private final ReadOptions readOptions;
+
     private String currentFile;
     private SstFileReader currentFileReader;
-    private SstFileReaderIterator currentFileIterator;
+    private HddsUtils.CloseableIterator<T> currentFileIterator;
 
-    private ManagedSstFileIterator(Collection<String> files)
-        throws RocksDBException {
-      // TODO: Check if default Options and ReadOptions is enough.
-      this.options = new Options();
-      this.readOptions = new ReadOptions();
+    private MultipleSstFileIterator(Collection<String> files)
+            throws IOException, RocksDBException,
+            NativeLibraryNotLoadedException {
       this.fileNameIterator = files.iterator();
       moveToNextFile();
     }
 
+    protected abstract HddsUtils.CloseableIterator<T> 
initNewKeyIteratorForFile(

Review Comment:
   ```suggestion
       protected abstract HddsUtils.CloseableIterator<T> getKeyIteratorForFile(
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java:
##########
@@ -92,6 +107,37 @@ public SnapshotDiffManager(ManagedRocksDB db,
     // Need for Diff Report
     this.codecRegistry.addCodec(DiffReportEntry.class,
         new OmDBDiffReportEntryCodec());
+    isNativeRocksToolsLoaded = NativeLibraryLoader.getInstance()
+            .loadLibrary(NativeConstants.ROCKS_TOOLS_NATIVE_LIBRARY_NAME);
+    if (isNativeRocksToolsLoaded) {
+      try {
+        initSSTDumpTool(configuration);
+        isNativeRocksToolsLoaded = true;
+      } catch (NativeLibraryNotLoadedException e) {
+        LOG.error("Unable to load SSTDumpTool ", e);
+        isNativeRocksToolsLoaded = false;
+      }
+    }
+  }
+
+  private void initSSTDumpTool(OzoneConfiguration conf)
+          throws NativeLibraryNotLoadedException {
+    int threadPoolSize = conf.getInt(
+            OzoneConfigKeys.OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE,
+            OzoneConfigKeys
+                    
.OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE_DEFAULT);
+    int bufferSize = (int)conf.getStorageSize(

Review Comment:
   ```suggestion
       int bufferSize = (int) conf.getStorageSize(
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java:
##########
@@ -217,37 +270,91 @@ public SnapshotDiffReport getSnapshotDiffReport(final 
String volume,
 
       Map<String, String> tablePrefixes =
           getTablePrefixes(toSnapshot.getMetadataManager(), volume, bucket);
-
-      final Set<String> deltaFilesForKeyOrFileTable =
-          getDeltaFiles(fromSnapshot, toSnapshot,
-              Collections.singletonList(fsKeyTable.getName()), fsInfo, tsInfo,
+      List<String> tablesToLookUp =
+              Collections.singletonList(fsKeyTable.getName());

Review Comment:
   Looks like we will never need to explicitly add `fileTable` here since 
`getKeyTable()` auto picks the right one. In this case, can we just use 
`String` rather than a `List`?
   
   Write more code comments inline.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java:
##########
@@ -92,6 +107,37 @@ public SnapshotDiffManager(ManagedRocksDB db,
     // Need for Diff Report
     this.codecRegistry.addCodec(DiffReportEntry.class,
         new OmDBDiffReportEntryCodec());
+    isNativeRocksToolsLoaded = NativeLibraryLoader.getInstance()
+            .loadLibrary(NativeConstants.ROCKS_TOOLS_NATIVE_LIBRARY_NAME);
+    if (isNativeRocksToolsLoaded) {
+      try {
+        initSSTDumpTool(configuration);
+        isNativeRocksToolsLoaded = true;
+      } catch (NativeLibraryNotLoadedException e) {
+        LOG.error("Unable to load SSTDumpTool ", e);

Review Comment:
   ```suggestion
           LOG.error("Unable to load SSTDumpTool", e);
   ```



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/ManagedSstFileReader.java:
##########
@@ -46,41 +47,130 @@ public class ManagedSstFileReader {
   public ManagedSstFileReader(final Collection<String> sstFiles) {
     this.sstFiles = sstFiles;
   }
-  public Stream<String> getKeyStream() throws RocksDBException {
-    final ManagedSstFileIterator itr = new ManagedSstFileIterator(sstFiles);
-    final Spliterator<String> spliterator = Spliterators
-        .spliteratorUnknownSize(itr, 0);
-    return StreamSupport.stream(spliterator, false).onClose(itr::close);
+
+  public Stream<String> getKeyStream() throws RocksDBException,
+          NativeLibraryNotLoadedException, IOException {
+    //TODO: [SNAPSHOT] Check if default Options and ReadOptions is enough.
+    ManagedOptions options = new ManagedOptions();
+    ReadOptions readOptions = new ReadOptions();
+    final MultipleSstFileIterator<String> itr =
+            new MultipleSstFileIterator<String>(sstFiles) {
+          @Override
+          protected HddsUtils.CloseableIterator<String>
+              initNewKeyIteratorForFile(String file) throws RocksDBException {
+            return new ManagedSstFileIterator<String>(file, options,
+                    readOptions) {
+              @Override
+              protected String getIteratorValue(
+                      SstFileReaderIterator iterator) {
+                return new String(iterator.key(), UTF_8);
+              }
+            };
+          }
+
+          @Override
+          public void close() throws IOException {
+            super.close();
+            options.close();
+            readOptions.close();
+          }
+        };
+    return RdbUtil.getStreamFromIterator(itr);
+  }
+  public Stream<String> getKeyStreamWithTombstone(
+          ManagedSSTDumpTool sstDumpTool) throws IOException, RocksDBException,
+          NativeLibraryNotLoadedException {
+    //TODO: [SNAPSHOT] Check if default Options is enough.
+    ManagedOptions options = new ManagedOptions();
+    final MultipleSstFileIterator<String> itr =
+            new MultipleSstFileIterator<String>(sstFiles) {
+          @Override
+          protected HddsUtils.CloseableIterator<String>
+              initNewKeyIteratorForFile(String file)
+                  throws NativeLibraryNotLoadedException, IOException {
+            return new ManagedSSTDumpIterator<String>(sstDumpTool, file,
+                    options) {
+              @Override
+              protected String getTransformedValue(KeyValue value) {
+                return value.getKey();
+              }
+            };
+          }
+
+          @Override
+          public void close() throws IOException {
+            super.close();
+            options.close();
+          }
+        };
+    return RdbUtil.getStreamFromIterator(itr);
+  }
+
+  private abstract static class ManagedSstFileIterator<T> implements
+          HddsUtils.CloseableIterator<T> {
+    private SstFileReader fileReader;
+    private SstFileReaderIterator fileReaderIterator;
+
+    ManagedSstFileIterator(String path, ManagedOptions options,
+                           ReadOptions readOptions)
+            throws RocksDBException {
+      this.fileReader = new SstFileReader(options);
+      this.fileReader.open(path);
+      this.fileReaderIterator = fileReader.newIterator(readOptions);
+      fileReaderIterator.seekToFirst();
+    }
+
+    @Override
+    public void close() throws IOException {
+      this.fileReaderIterator.close();
+      this.fileReader.close();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return fileReaderIterator.isValid();
+    }
+
+    protected abstract T getIteratorValue(SstFileReaderIterator iterator);
+
+    @Override
+    public T next() {
+      T value = getIteratorValue(fileReaderIterator);
+      fileReaderIterator.next();
+      return value;
+    }
   }
 
-  private static final class ManagedSstFileIterator implements
-      Iterator<String>, Closeable {
+  private abstract static class MultipleSstFileIterator<T> implements
+          HddsUtils.CloseableIterator<T> {
 
     private final Iterator<String> fileNameIterator;
-    private final Options options;
-    private final ReadOptions readOptions;
+
     private String currentFile;
     private SstFileReader currentFileReader;

Review Comment:
   This doesn't seem to be used anywhere.
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to