This is an automated email from the ASF dual-hosted git repository.

hemant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ed9f44cc8 HDDS-8196. DB listener to only track SSTs that belong to 
the tables of interest. (#5511)
2ed9f44cc8 is described below

commit 2ed9f44cc87ec848a5bb5bfa5ed87b14aa784fdf
Author: Hemant Kumar <[email protected]>
AuthorDate: Thu Nov 16 08:44:41 2023 -0800

    HDDS-8196. DB listener to only track SSTs that belong to the tables of 
interest. (#5511)
---
 .../ozone/rocksdiff/RocksDBCheckpointDiffer.java   | 146 +++++++++++----------
 .../rocksdiff/TestRocksDBCheckpointDiffer.java     | 120 +++++++++++++++++
 .../org/apache/hadoop/ozone/om/TestOmSnapshot.java | 133 ++++++++++++++++++-
 3 files changed, 326 insertions(+), 73 deletions(-)

diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index 605c189514..b6d6e773df 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -19,6 +19,7 @@ package org.apache.ozone.rocksdiff;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.graph.GraphBuilder;
 import com.google.common.graph.MutableGraph;
 
@@ -90,17 +91,6 @@ import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTI
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_DAG_DAEMON_RUN_INTERVAL_DEFAULT;
 
-// TODO
-//  8. Handle bootstrapping rocksDB for a new OM follower node
-//      - new node will receive Active object store as well as all existing
-//      rocksDB checkpoints.
-//      - This bootstrapping should also receive the compaction-DAG information
-//  9. Handle rebuilding the DAG for a lagging follower. There are two cases
-//      - receive RATIS transactions to replay. Nothing needs to be done in
-//      these cases.
-//      - Getting the DB sync. This case needs to handle getting the
-//      compaction-DAG information as well.
-
 /**
  * RocksDB checkpoint differ.
  * <p>
@@ -188,6 +178,13 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
   private ColumnFamilyHandle compactionLogTableCFHandle;
   private RocksDB activeRocksDB;
 
+  /**
+   * For snapshot diff calculation we only need to track following column
+   * families. Other column families are irrelevant for snapshot diff.
+   */
+  public static final Set<String> COLUMN_FAMILIES_TO_TRACK_IN_DAG =
+      ImmutableSet.of("keyTable", "directoryTable", "fileTable");
+
   /**
    * This is a package private constructor and should not be used other than
    * testing. Caller should use RocksDBCheckpointDifferHolder#getInstance() to
@@ -445,19 +442,42 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
     }
   }
 
+  @VisibleForTesting
+  boolean shouldSkipCompaction(byte[] columnFamilyBytes,
+                               List<String> inputFiles,
+                               List<String> outputFiles) {
+    String columnFamily = StringUtils.bytes2String(columnFamilyBytes);
+
+    if (!COLUMN_FAMILIES_TO_TRACK_IN_DAG.contains(columnFamily)) {
+      LOG.debug("Skipping compaction for columnFamily: {}", columnFamily);
+      return true;
+    }
+
+    if (inputFiles.isEmpty()) {
+      LOG.debug("Compaction input files list is empty");
+      return true;
+    }
+
+    if (new HashSet<>(inputFiles).equals(new HashSet<>(outputFiles))) {
+      LOG.info("Skipped the compaction entry. Compaction input files: " +
+          "{} and output files: {} are same.", inputFiles, outputFiles);
+      return true;
+    }
+
+    return false;
+  }
+
   private AbstractEventListener newCompactionBeginListener() {
     return new AbstractEventListener() {
       @Override
       public void onCompactionBegin(RocksDB db,
                                     CompactionJobInfo compactionJobInfo) {
-        if (compactionJobInfo.inputFiles().size() == 0) {
-          LOG.error("Compaction input files list is empty");
+        if (shouldSkipCompaction(compactionJobInfo.columnFamilyName(),
+            compactionJobInfo.inputFiles(),
+            compactionJobInfo.outputFiles())) {
           return;
         }
 
-        // Note the current compaction listener implementation does not
-        // differentiate which column family each SST store. It is tracking
-        // all SST files.
         synchronized (this) {
           if (closed) {
             return;
@@ -470,10 +490,7 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
           }
         }
 
-        // Create hardlink backups for the SST files that are going
-        // to be deleted after this RDB compaction.
         for (String file : compactionJobInfo.inputFiles()) {
-          LOG.debug("Creating hard link for '{}'", file);
           createLink(Paths.get(sstBackupDir, new File(file).getName()),
               Paths.get(file));
         }
@@ -481,23 +498,15 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
     };
   }
 
+
   private AbstractEventListener newCompactionCompletedListener() {
     return new AbstractEventListener() {
       @Override
       public void onCompactionCompleted(RocksDB db,
-          CompactionJobInfo compactionJobInfo) {
-
-        if (compactionJobInfo.inputFiles().isEmpty()) {
-          LOG.error("Compaction input files list is empty");
-          return;
-        }
-
-        if (new HashSet<>(compactionJobInfo.inputFiles())
-            .equals(new HashSet<>(compactionJobInfo.outputFiles()))) {
-          LOG.info("Skipped the compaction entry. Compaction input files: " +
-                  "{} and output files: {} are same.",
-              compactionJobInfo.inputFiles(),
-              compactionJobInfo.outputFiles());
+                                        CompactionJobInfo compactionJobInfo) {
+        if (shouldSkipCompaction(compactionJobInfo.columnFamilyName(),
+            compactionJobInfo.inputFiles(),
+            compactionJobInfo.outputFiles())) {
           return;
         }
 
@@ -534,14 +543,13 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
 
           waitForTarballCreation();
 
-
           // Add the compaction log entry to Compaction log table.
           addToCompactionLogTable(compactionLogEntry);
 
           // Populate the DAG
           populateCompactionDAG(compactionLogEntry.getInputFileInfoList(),
               compactionLogEntry.getOutputFileInfoList(),
-              db.getLatestSequenceNumber());
+              compactionLogEntry.getDbSequenceNumber());
         }
       }
     };
@@ -1530,50 +1538,56 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
     graph.generateImage(filePath);
   }
 
-  private List<CompactionFileInfo> toFileInfoList(
-      List<String> sstFiles,
-      ManagedOptions options,
-      ManagedReadOptions readOptions
+  private List<CompactionFileInfo> toFileInfoList(List<String> sstFiles,
+                                                  ManagedOptions options,
+                                                  ManagedReadOptions 
readOptions
   ) {
     if (CollectionUtils.isEmpty(sstFiles)) {
       return Collections.emptyList();
     }
 
-    final int fileNameOffset = sstFiles.get(0).lastIndexOf("/") + 1;
     List<CompactionFileInfo> response = new ArrayList<>();
 
     for (String sstFile : sstFiles) {
-      String fileName = sstFile.substring(fileNameOffset,
-          sstFile.length() - SST_FILE_EXTENSION_LENGTH);
-      CompactionFileInfo.Builder fileInfoBuilder =
-          new CompactionFileInfo.Builder(fileName);
-      SstFileReader fileReader = new SstFileReader(options);
-      try {
-        fileReader.open(sstFile);
-        String columnFamily = StringUtils.bytes2String(
-            fileReader.getTableProperties().getColumnFamilyName());
-        SstFileReaderIterator iterator = fileReader.newIterator(readOptions);
-        iterator.seekToFirst();
-        String startKey = StringUtils.bytes2String(iterator.key());
-        iterator.seekToLast();
-        String endKey = StringUtils.bytes2String(iterator.key());
-        fileInfoBuilder.setStartRange(startKey)
-            .setEndRange(endKey)
-            .setColumnFamily(columnFamily);
-      } catch (RocksDBException rocksDBException) {
-        // Ideally it should not happen. If it does just log the exception.
-        // And let the compaction complete without the exception.
-        // Throwing exception in compaction listener could fail the RocksDB.
-        // In case of exception, compaction node will be missing start key,
-        // end key and column family. And it will continue the traversal as
-        // it was before HDDS-8940.
-        LOG.warn("Failed to read SST file: {}.", sstFile, rocksDBException);
-      }
-      response.add(fileInfoBuilder.build());
+      CompactionFileInfo fileInfo = toFileInfo(sstFile, options, readOptions);
+      response.add(fileInfo);
     }
     return response;
   }
 
+  private CompactionFileInfo toFileInfo(String sstFile,
+                                        ManagedOptions options,
+                                        ManagedReadOptions readOptions) {
+    final int fileNameOffset = sstFile.lastIndexOf("/") + 1;
+    String fileName = sstFile.substring(fileNameOffset,
+        sstFile.length() - SST_FILE_EXTENSION_LENGTH);
+    CompactionFileInfo.Builder fileInfoBuilder =
+        new CompactionFileInfo.Builder(fileName);
+
+    try (SstFileReader fileReader = new SstFileReader(options)) {
+      fileReader.open(sstFile);
+      String columnFamily = StringUtils.bytes2String(
+          fileReader.getTableProperties().getColumnFamilyName());
+      SstFileReaderIterator iterator = fileReader.newIterator(readOptions);
+      iterator.seekToFirst();
+      String startKey = StringUtils.bytes2String(iterator.key());
+      iterator.seekToLast();
+      String endKey = StringUtils.bytes2String(iterator.key());
+      fileInfoBuilder.setStartRange(startKey)
+          .setEndRange(endKey)
+          .setColumnFamily(columnFamily);
+    } catch (RocksDBException rocksDBException) {
+      // Ideally it should not happen. If it does just log the exception.
+      // And let the compaction complete without the exception.
+      // Throwing exception in compaction listener could fail the RocksDB.
+      // In case of exception, compaction node will be missing start key,
+      // end key and column family. And during diff calculation it will
+      // continue the traversal as it was before HDDS-8940.
+      LOG.warn("Failed to read SST file: {}.", sstFile, rocksDBException);
+    }
+    return fileInfoBuilder.build();
+  }
+
   @VisibleForTesting
   boolean shouldSkipNode(CompactionNode node,
                          Map<String, String> columnFamilyToPrefixMap) {
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
index b15089b574..7ce7974581 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
@@ -56,6 +56,7 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
 import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
@@ -82,6 +83,7 @@ import org.rocksdb.Options;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
+import org.rocksdb.SstFileReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.event.Level;
@@ -91,6 +93,7 @@ import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTI
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_DAG_DAEMON_RUN_INTERVAL_DEFAULT;
 import static org.apache.hadoop.util.Time.now;
+import static 
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.COLUMN_FAMILIES_TO_TRACK_IN_DAG;
 import static 
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.COMPACTION_LOG_FILE_NAME_SUFFIX;
 import static 
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_DAG_LIVE_NODES;
 import static 
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_READ_ALL_DB_KEYS;
@@ -133,6 +136,8 @@ public class TestRocksDBCheckpointDiffer {
   private RocksDBCheckpointDiffer rocksDBCheckpointDiffer;
   private RocksDB activeRocksDB;
   private ColumnFamilyHandle keyTableCFHandle;
+  private ColumnFamilyHandle directoryTableCFHandle;
+  private ColumnFamilyHandle fileTableCFHandle;
   private ColumnFamilyHandle compactionLogTableCFHandle;
 
   @BeforeEach
@@ -184,6 +189,8 @@ public class TestRocksDBCheckpointDiffer {
     activeRocksDB = RocksDB.open(dbOptions, activeDbDirName, cfDescriptors,
         cfHandles);
     keyTableCFHandle = cfHandles.get(1);
+    directoryTableCFHandle = cfHandles.get(2);
+    fileTableCFHandle = cfHandles.get(3);
     compactionLogTableCFHandle = cfHandles.get(4);
 
     rocksDBCheckpointDiffer.setCompactionLogTableCFHandle(cfHandles.get(4));
@@ -207,6 +214,8 @@ public class TestRocksDBCheckpointDiffer {
   public void cleanUp() {
     IOUtils.closeQuietly(rocksDBCheckpointDiffer);
     IOUtils.closeQuietly(keyTableCFHandle);
+    IOUtils.closeQuietly(directoryTableCFHandle);
+    IOUtils.closeQuietly(fileTableCFHandle);
     IOUtils.closeQuietly(compactionLogTableCFHandle);
     IOUtils.closeQuietly(activeRocksDB);
     deleteDirectory(compactionLogDir);
@@ -1819,4 +1828,115 @@ public class TestRocksDBCheckpointDiffer {
     assertEquals(expectedResponse, rocksDBCheckpointDiffer.shouldSkipNode(node,
         columnFamilyToPrefixMap));
   }
+
+  private void createKeys(ColumnFamilyHandle cfh,
+                          String keyPrefix,
+                          String valuePrefix,
+                          int numberOfKeys) throws RocksDBException {
+
+    for (int i = 0; i < numberOfKeys; ++i) {
+      String generatedString = RandomStringUtils.randomAlphabetic(7);
+      String keyStr = keyPrefix + i + "-" + generatedString;
+      String valueStr = valuePrefix + i + "-" + generatedString;
+      byte[] key = keyStr.getBytes(UTF_8);
+      activeRocksDB.put(cfh, key, valueStr.getBytes(UTF_8));
+      if (i % 10 == 0) {
+        activeRocksDB.flush(new FlushOptions(), cfh);
+      }
+    }
+  }
+
+  // End-to-end to verify that only 'keyTable', 'directoryTable'
+  // and 'fileTable' column families SST files are added to compaction DAG.
+  @Test
+  public void testDagOnlyContainsDesiredCfh()
+      throws RocksDBException, IOException {
+    // Setting is not non-empty table so that 'isSnapshotInfoTableEmpty'
+    // returns true.
+    rocksDBCheckpointDiffer.setSnapshotInfoTableCFHandle(keyTableCFHandle);
+    createKeys(keyTableCFHandle, "keyName-", "keyValue-", 100);
+    createKeys(directoryTableCFHandle, "dirName-", "dirValue-", 100);
+    createKeys(fileTableCFHandle, "fileName-", "fileValue-", 100);
+    createKeys(compactionLogTableCFHandle, "logName-", "logValue-", 100);
+
+    // Make sures that some compaction happened.
+    assertFalse(rocksDBCheckpointDiffer.getCompactionNodeMap().isEmpty());
+
+    List<CompactionNode> compactionNodes = rocksDBCheckpointDiffer.
+        getCompactionNodeMap().values().stream()
+        .filter(node -> !COLUMN_FAMILIES_TO_TRACK_IN_DAG.contains(
+            node.getColumnFamily()))
+        .collect(Collectors.toList());
+
+    // CompactionNodeMap should not contain any node other than 'keyTable',
+    // 'directoryTable' and 'fileTable' column families nodes.
+    assertTrue(compactionNodes.isEmpty());
+
+    // Assert that only 'keyTable', 'directoryTable' and 'fileTable'
+    // column families SST files are backed-up.
+    try (ManagedOptions options = new ManagedOptions();
+         Stream<Path> pathStream = Files.list(
+             Paths.get(rocksDBCheckpointDiffer.getSSTBackupDir()))) {
+      pathStream.forEach(path -> {
+        try (SstFileReader fileReader = new SstFileReader(options)) {
+          fileReader.open(path.toAbsolutePath().toString());
+          String columnFamily = StringUtils.bytes2String(
+              fileReader.getTableProperties().getColumnFamilyName());
+          assertTrue(COLUMN_FAMILIES_TO_TRACK_IN_DAG.contains(columnFamily));
+        } catch (RocksDBException rocksDBException) {
+          fail("Failed to read file: " + path.toAbsolutePath());
+        }
+      });
+    }
+  }
+
+  private static Stream<Arguments> shouldSkipFileCases() {
+    return Stream.of(
+        Arguments.of("Case#1: volumeTable is irrelevant column family.",
+            "volumeTable".getBytes(UTF_8),
+            Arrays.asList("inputFile1", "inputFile2", "inputFile3"),
+            Arrays.asList("outputFile1", "outputFile2"), true),
+        Arguments.of("Case#2: bucketTable is irrelevant column family.",
+            "bucketTable".getBytes(UTF_8),
+            Arrays.asList("inputFile1", "inputFile2", "inputFile3"),
+            Arrays.asList("outputFile1", "outputFile2"), true),
+        Arguments.of("Case#3: snapshotInfoTable is irrelevant column family.",
+            "snapshotInfoTable".getBytes(UTF_8),
+            Arrays.asList("inputFile1", "inputFile2", "inputFile3"),
+            Arrays.asList("outputFile1", "outputFile2"), true),
+        Arguments.of("Case#4: compactionLogTable is irrelevant column family.",
+            "compactionLogTable".getBytes(UTF_8),
+            Arrays.asList("inputFile1", "inputFile2", "inputFile3"),
+            Arrays.asList("outputFile1", "outputFile2"), true),
+        Arguments.of("Case#5: Input file list is empty..",
+            "keyTable".getBytes(UTF_8), Collections.emptyList(),
+            Arrays.asList("outputFile1", "outputFile2"), true),
+        Arguments.of("Case#6: Input and output file lists are same.",
+            "keyTable".getBytes(UTF_8),
+            Arrays.asList("inputFile1", "inputFile2", "inputFile3"),
+            Arrays.asList("inputFile1", "inputFile2", "inputFile3"), true),
+        Arguments.of("Case#7: keyTable is relevant column family.",
+            "keyTable".getBytes(UTF_8),
+            Arrays.asList("inputFile1", "inputFile2", "inputFile3"),
+            Arrays.asList("outputFile1", "outputFile2"), false),
+        Arguments.of("Case#8: directoryTable is relevant column family.",
+            "directoryTable".getBytes(UTF_8),
+            Arrays.asList("inputFile1", "inputFile2", "inputFile3"),
+            Arrays.asList("outputFile1", "outputFile2"), false),
+        Arguments.of("Case#9: fileTable is relevant column family.",
+            "fileTable".getBytes(UTF_8),
+            Arrays.asList("inputFile1", "inputFile2", "inputFile3"),
+            Arrays.asList("outputFile1", "outputFile2"), false));
+  }
+
+  @MethodSource("shouldSkipFileCases")
+  @ParameterizedTest(name = "{0}")
+  public void testShouldSkipFile(String description,
+                                 byte[] columnFamilyBytes,
+                                 List<String> inputFiles,
+                                 List<String> outputFiles,
+                                 boolean expectedResult) {
+    assertEquals(expectedResult, rocksDBCheckpointDiffer
+        .shouldSkipCompaction(columnFamilyBytes, inputFiles, outputFiles));
+  }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
index 97cbec94bd..4ad0c804f1 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.HddsWhiteboxTestUtils;
 import org.apache.hadoop.hdds.utils.db.DBProfile;
+import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.RDBStore;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
@@ -74,6 +75,7 @@ import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse;
 import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
+import org.apache.ozone.rocksdiff.CompactionNode;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.SlowTest;
 import org.apache.ozone.test.UnhealthyTest;
@@ -110,6 +112,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import static org.apache.commons.lang3.StringUtils.leftPad;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE;
 import static org.apache.hadoop.ozone.OzoneAcl.AclScope.DEFAULT;
 import static 
org.apache.hadoop.ozone.admin.scm.FinalizeUpgradeCommandUtil.isDone;
@@ -133,6 +136,7 @@ import static 
org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse.Cancel
 import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.CANCELLED;
 import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.DONE;
 import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.IN_PROGRESS;
+import static 
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.COLUMN_FAMILIES_TO_TRACK_IN_DAG;
 import static org.awaitility.Awaitility.with;
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
@@ -2126,8 +2130,8 @@ public class TestOmSnapshot {
     if (response.getJobStatus() == DONE) {
       assertEquals(100, response.getSnapshotDiffReport().getDiffList().size());
     } else if (response.getJobStatus() == IN_PROGRESS) {
-      SnapshotDiffReportOzone diffReport =
-          fetchReportPage(snapshot1, snapshot2, null, 0);
+      SnapshotDiffReportOzone diffReport = fetchReportPage(volumeName,
+          bucketName, snapshot1, snapshot2, null, 0);
       assertEquals(100, diffReport.getDiffList().size());
     } else {
       fail("Unexpected job status for the test.");
@@ -2144,8 +2148,8 @@ public class TestOmSnapshot {
     String snapshot2 = "snap-" + RandomStringUtils.randomNumeric(5);
     createSnapshots(snapshot1, snapshot2);
 
-    SnapshotDiffReportOzone diffReport = fetchReportPage(snapshot1, snapshot2,
-        null, pageSize);
+    SnapshotDiffReportOzone diffReport = fetchReportPage(volumeName,
+        bucketName, snapshot1, snapshot2, null, pageSize);
 
     List<DiffReportEntry> diffReportEntries = diffReport.getDiffList();
     String nextToken = diffReport.getToken();
@@ -2158,21 +2162,24 @@ public class TestOmSnapshot {
         until(() -> cluster.getOzoneManager().isRunning());
 
     while (nextToken == null || StringUtils.isNotEmpty(nextToken)) {
-      diffReport = fetchReportPage(snapshot1, snapshot2, nextToken, pageSize);
+      diffReport = fetchReportPage(volumeName, bucketName, snapshot1,
+          snapshot2, nextToken, pageSize);
       diffReportEntries.addAll(diffReport.getDiffList());
       nextToken = diffReport.getToken();
     }
     assertEquals(100, diffReportEntries.size());
   }
 
-  private SnapshotDiffReportOzone fetchReportPage(String fromSnapshot,
+  private SnapshotDiffReportOzone fetchReportPage(String volName,
+                                                  String buckName,
+                                                  String fromSnapshot,
                                                   String toSnapshot,
                                                   String token,
                                                   int pageSize)
       throws IOException, InterruptedException {
 
     while (true) {
-      SnapshotDiffResponse response = store.snapshotDiff(volumeName, 
bucketName,
+      SnapshotDiffResponse response = store.snapshotDiff(volName, buckName,
           fromSnapshot, toSnapshot, token, pageSize, forceFullSnapshotDiff,
           disableNativeDiff);
       if (response.getJobStatus() == IN_PROGRESS) {
@@ -2412,4 +2419,116 @@ public class TestOmSnapshot {
     fileKey.close();
   }
 
+  private String getKeySuffix(int index) {
+    return leftPad(Integer.toString(index), 10, "0");
+  }
+
+  // End-to-end test to verify that compaction DAG only tracks 'keyTable',
+  // 'directoryTable' and 'fileTable' column families. And only these
+  // column families are used in SST diff calculation.
+  @Test
+  public void testSnapshotCompactionDag() throws Exception {
+    String volume1 = "volume-1-" + RandomStringUtils.randomNumeric(5);
+    String bucket1 = "bucket-1-" + RandomStringUtils.randomNumeric(5);
+    String bucket2 = "bucket-2-" + RandomStringUtils.randomNumeric(5);
+    String bucket3 = "bucket-3-" + RandomStringUtils.randomNumeric(5);
+
+    store.createVolume(volume1);
+    OzoneVolume ozoneVolume = store.getVolume(volume1);
+    ozoneVolume.createBucket(bucket1);
+    OzoneBucket ozoneBucket1 = ozoneVolume.getBucket(bucket1);
+
+    DBStore activeDbStore = ozoneManager.getMetadataManager().getStore();
+
+    for (int i = 0; i < 100; i++) {
+      String keyName = "/dir1/dir2/dir3/key-" + getKeySuffix(i);
+      createFileKey(ozoneBucket1, keyName);
+    }
+
+    createSnapshot(volume1, bucket1, "bucket1-snap1");
+    activeDbStore.compactDB();
+
+    ozoneVolume.createBucket(bucket2);
+    OzoneBucket ozoneBucket2 = ozoneVolume.getBucket(bucket2);
+
+    for (int i = 100; i < 200; i++) {
+      String keyName = "/dir1/dir2/dir3/key-" + getKeySuffix(i);
+      createFileKey(ozoneBucket1, keyName);
+      createFileKey(ozoneBucket2, keyName);
+    }
+
+    createSnapshot(volume1, bucket1, "bucket1-snap2");
+    createSnapshot(volume1, bucket2, "bucket2-snap1");
+    activeDbStore.compactDB();
+
+    ozoneVolume.createBucket(bucket3);
+    OzoneBucket ozoneBucket3 = ozoneVolume.getBucket(bucket3);
+
+    for (int i = 200; i < 300; i++) {
+      String keyName = "/dir1/dir2/dir3/key-" + getKeySuffix(i);
+      createFileKey(ozoneBucket1, keyName);
+      createFileKey(ozoneBucket2, keyName);
+      createFileKey(ozoneBucket3, keyName);
+    }
+
+    createSnapshot(volume1, bucket1, "bucket1-snap3");
+    createSnapshot(volume1, bucket2, "bucket2-snap2");
+    createSnapshot(volume1, bucket3, "bucket3-snap1");
+    activeDbStore.compactDB();
+
+    for (int i = 300; i < 400; i++) {
+      String keyName = "/dir1/dir2/dir3/key-" + getKeySuffix(i);
+      createFileKey(ozoneBucket3, keyName);
+      createFileKey(ozoneBucket2, keyName);
+    }
+
+    createSnapshot(volume1, bucket2, "bucket2-snap3");
+    createSnapshot(volume1, bucket3, "bucket3-snap2");
+    activeDbStore.compactDB();
+
+    for (int i = 400; i < 500; i++) {
+      String keyName = "/dir1/dir2/dir3/key-" + getKeySuffix(i);
+      createFileKey(ozoneBucket3, keyName);
+    }
+
+    createSnapshot(volume1, bucket3, "bucket3-snap3");
+
+    List<CompactionNode> filteredNodes = ozoneManager.getMetadataManager()
+        .getStore()
+        .getRocksDBCheckpointDiffer()
+        .getCompactionNodeMap().values().stream()
+        .filter(node ->
+            !COLUMN_FAMILIES_TO_TRACK_IN_DAG.contains(node.getColumnFamily()))
+        .collect(Collectors.toList());
+
+    assertEquals(0, filteredNodes.size());
+
+    assertEquals(100,
+        fetchReportPage(volume1, bucket1, "bucket1-snap1", "bucket1-snap2",
+            null, 0).getDiffList().size());
+    assertEquals(100,
+        fetchReportPage(volume1, bucket1, "bucket1-snap2", "bucket1-snap3",
+            null, 0).getDiffList().size());
+    assertEquals(200,
+        fetchReportPage(volume1, bucket1, "bucket1-snap1", "bucket1-snap3",
+            null, 0).getDiffList().size());
+    assertEquals(100,
+        fetchReportPage(volume1, bucket2, "bucket2-snap1", "bucket2-snap2",
+            null, 0).getDiffList().size());
+    assertEquals(100,
+        fetchReportPage(volume1, bucket2, "bucket2-snap2", "bucket2-snap3",
+            null, 0).getDiffList().size());
+    assertEquals(200,
+        fetchReportPage(volume1, bucket2, "bucket2-snap1", "bucket2-snap3",
+            null, 0).getDiffList().size());
+    assertEquals(100,
+        fetchReportPage(volume1, bucket3, "bucket3-snap1", "bucket3-snap2",
+            null, 0).getDiffList().size());
+    assertEquals(100,
+        fetchReportPage(volume1, bucket3, "bucket3-snap2", "bucket3-snap3",
+            null, 0).getDiffList().size());
+    assertEquals(200,
+        fetchReportPage(volume1, bucket3, "bucket3-snap1", "bucket3-snap3",
+            null, 0).getDiffList().size());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to