hemantk-12 commented on code in PR #5083:
URL: https://github.com/apache/ozone/pull/5083#discussion_r1275521394


##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java:
##########
@@ -1036,6 +1069,272 @@ public void testInstallCorruptedCheckpointFailure() 
throws Exception {
     assertLogCapture(logCapture, msg);
   }
 
+  @Test
+  @DisplayName("testSnapshotBackgroundServices")
+  @SuppressWarnings("methodlength")
+  public void testSnapshotBackgroundServices()
+      throws Exception {
+    // Get the leader OM
+    String leaderOMNodeId = OmFailoverProxyUtil
+        .getFailoverProxyProvider(objectStore.getClientProxy())
+        .getCurrentProxyOMNodeId();
+    OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+
+    // Find the inactive OM
+    String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
+    if (cluster.isOMActive(followerNodeId)) {
+      followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
+    }
+    OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+
+    // Create some snapshots, each with new keys
+    int keyIncrement = 10;
+    String snapshotNamePrefix = "snapshot";
+    String snapshotName = "";
+    List<String> keys = new ArrayList<>();
+    SnapshotInfo snapshotInfo = null;
+    for (int snapshotCount = 0; snapshotCount < 10;
+         snapshotCount++) {
+      snapshotName = snapshotNamePrefix + snapshotCount;
+      keys = writeKeys(keyIncrement);
+      snapshotInfo = createOzoneSnapshot(leaderOM, snapshotName);
+    }
+
+    // Prepare baseline data for compaction backup pruning
+    String sstBackupDir = leaderOM
+        .getMetadataManager()
+        .getStore()
+        .getRocksDBCheckpointDiffer()
+        .getSSTBackupDir();
+    Assertions.assertNotNull(sstBackupDir);
+    Path sstBackupDirPath = Paths.get(sstBackupDir);
+    int numberOfSstFiles = 0;
+    try (DirectoryStream<Path> files =
+             Files.newDirectoryStream(sstBackupDirPath)) {
+      for (Path ignored : files) {
+        numberOfSstFiles++;
+      }
+    }
+
+    // Get the latest db checkpoint from the leader OM.
+    TransactionInfo transactionInfo =
+        TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager());
+    TermIndex leaderOMTermIndex =
+        TermIndex.valueOf(transactionInfo.getTerm(),
+            transactionInfo.getTransactionIndex());
+    long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
+
+    // Start the inactive OM. Checkpoint installation will happen 
spontaneously.
+    cluster.startInactiveOM(followerNodeId);
+
+    // The recently started OM should be lagging behind the leader OM.
+    // Wait & for follower to update transactions to leader snapshot index.
+    // Timeout error if follower does not load update within 10s
+    GenericTestUtils.waitFor(() ->
+        followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
+            >= leaderOMSnapshotIndex - 1, 100, 10000);
+
+
+    // Verify RPC server is running
+    GenericTestUtils.waitFor(followerOM::isOmRpcServerRunning, 100, 5000);
+
+    // Read & Write after snapshot installed.
+    List<String> newKeys = writeKeys(1);
+    readKeys(newKeys);
+
+    checkSnapshot(leaderOM, followerOM, snapshotName, keys, snapshotInfo);
+
+    // verify that the bootstrap Follower OM can become leader
+    leaderOM.transferLeadership(followerNodeId);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        followerOM.checkLeaderStatus();
+        return true;
+      } catch (OMNotLeaderException | OMLeaderNotReadyException e) {
+        return false;
+      }
+    }, 1000, 10000);

Review Comment:
   nit: I think we can reduce the poll interval to 100ms?



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java:
##########
@@ -1036,6 +1069,272 @@ public void testInstallCorruptedCheckpointFailure() 
throws Exception {
     assertLogCapture(logCapture, msg);
   }
 
+  @Test
+  @DisplayName("testSnapshotBackgroundServices")
+  @SuppressWarnings("methodlength")
+  public void testSnapshotBackgroundServices()
+      throws Exception {
+    // Get the leader OM
+    String leaderOMNodeId = OmFailoverProxyUtil
+        .getFailoverProxyProvider(objectStore.getClientProxy())
+        .getCurrentProxyOMNodeId();
+    OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+
+    // Find the inactive OM
+    String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
+    if (cluster.isOMActive(followerNodeId)) {
+      followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
+    }
+    OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+
+    // Create some snapshots, each with new keys
+    int keyIncrement = 10;
+    String snapshotNamePrefix = "snapshot";
+    String snapshotName = "";
+    List<String> keys = new ArrayList<>();
+    SnapshotInfo snapshotInfo = null;
+    for (int snapshotCount = 0; snapshotCount < 10;
+         snapshotCount++) {
+      snapshotName = snapshotNamePrefix + snapshotCount;
+      keys = writeKeys(keyIncrement);
+      snapshotInfo = createOzoneSnapshot(leaderOM, snapshotName);
+    }
+
+    // Prepare baseline data for compaction backup pruning
+    String sstBackupDir = leaderOM
+        .getMetadataManager()
+        .getStore()
+        .getRocksDBCheckpointDiffer()
+        .getSSTBackupDir();
+    Assertions.assertNotNull(sstBackupDir);
+    Path sstBackupDirPath = Paths.get(sstBackupDir);
+    int numberOfSstFiles = 0;
+    try (DirectoryStream<Path> files =
+             Files.newDirectoryStream(sstBackupDirPath)) {
+      for (Path ignored : files) {
+        numberOfSstFiles++;
+      }
+    }
+
+    // Get the latest db checkpoint from the leader OM.
+    TransactionInfo transactionInfo =
+        TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager());
+    TermIndex leaderOMTermIndex =
+        TermIndex.valueOf(transactionInfo.getTerm(),
+            transactionInfo.getTransactionIndex());
+    long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
+
+    // Start the inactive OM. Checkpoint installation will happen 
spontaneously.
+    cluster.startInactiveOM(followerNodeId);
+
+    // The recently started OM should be lagging behind the leader OM.
+    // Wait & for follower to update transactions to leader snapshot index.
+    // Timeout error if follower does not load update within 10s
+    GenericTestUtils.waitFor(() ->
+        followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
+            >= leaderOMSnapshotIndex - 1, 100, 10000);
+
+
+    // Verify RPC server is running
+    GenericTestUtils.waitFor(followerOM::isOmRpcServerRunning, 100, 5000);
+
+    // Read & Write after snapshot installed.
+    List<String> newKeys = writeKeys(1);
+    readKeys(newKeys);
+
+    checkSnapshot(leaderOM, followerOM, snapshotName, keys, snapshotInfo);
+
+    // verify that the bootstrap Follower OM can become leader
+    leaderOM.transferLeadership(followerNodeId);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        followerOM.checkLeaderStatus();
+        return true;
+      } catch (OMNotLeaderException | OMLeaderNotReadyException e) {
+        return false;
+      }
+    }, 1000, 10000);
+    OzoneManager newLeaderOM = cluster.getOMLeader();
+    Assertions.assertEquals(followerOM, newLeaderOM);
+    OzoneManager newFollowerOM =
+        cluster.getOzoneManager(leaderOM.getOMNodeId());
+    Assertions.assertEquals(leaderOM, newFollowerOM);
+
+    checkSnapshot(newLeaderOM, newFollowerOM, snapshotName, keys, 
snapshotInfo);
+    readKeys(newKeys);
+
+    // Prepare baseline data for compaction logs
+    String currentCompactionLogPath = newLeaderOM
+        .getMetadataManager()
+        .getStore()
+        .getRocksDBCheckpointDiffer()
+        .getCurrentCompactionLogPath();
+    int lastIndex = currentCompactionLogPath.lastIndexOf(OM_KEY_PREFIX);
+    String compactionLogsPath = currentCompactionLogPath
+        .substring(0, lastIndex);
+    int numberOfLogFiles = 0;
+    long contentLength;
+    Path compactionLogPath = Paths.get(compactionLogsPath);
+    Path currentCompactionLog = Paths.get(currentCompactionLogPath);
+    try (BufferedReader bufferedReader =
+             Files.newBufferedReader(currentCompactionLog);
+         DirectoryStream<Path> files =
+             Files.newDirectoryStream(compactionLogPath)) {
+      contentLength = bufferedReader.lines()
+          .mapToLong(String::length)
+          .reduce(0L, Long::sum);
+      for (Path ignored : files) {
+        numberOfLogFiles++;
+      }
+    }
+
+    // Check whether newly created snapshot gets processed by SFS
+    newKeys = writeKeys(1);
+    SnapshotInfo newSnapshot = createOzoneSnapshot(newLeaderOM,
+        snapshotNamePrefix + RandomStringUtils.randomNumeric(5));
+    Assertions.assertNotNull(newSnapshot);
+    File omMetadataDir =
+        OMStorage.getOmDbDir(newLeaderOM.getConfiguration());
+    String snapshotDir = omMetadataDir + OM_KEY_PREFIX + OM_SNAPSHOT_DIR;
+    Path filePath =
+        Paths.get(snapshotDir + OM_KEY_PREFIX + FILTERED_SNAPSHOTS);
+    Assertions.assertTrue(Files.exists(filePath));
+    GenericTestUtils.waitFor(() -> {
+      List<String> processedSnapshotIds;
+      try {
+        processedSnapshotIds = Files.readAllLines(filePath);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      return processedSnapshotIds.contains(newSnapshot.getSnapshotId()
+          .toString());
+    }, 1000, 30000);
+
+    // Check whether newly created keys data is reclaimed
+    String newKey = OM_KEY_PREFIX + ozoneBucket.getVolumeName() +
+        OM_KEY_PREFIX + ozoneBucket.getName() +
+        OM_KEY_PREFIX + newKeys.get(0);
+    Table<String, OmKeyInfo> omKeyInfoTable = newLeaderOM
+        .getMetadataManager()
+        .getKeyTable(ozoneBucket.getBucketLayout());
+    OmKeyInfo newKeyInfo = omKeyInfoTable.get(newKey);
+    Assertions.assertNotNull(newKeyInfo);
+
+    long usedBytes = getUsedBytes(newLeaderOM);
+
+    ozoneBucket.deleteKeys(newKeys);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return getUsedBytes(newLeaderOM) ==
+            usedBytes - newKeyInfo.getDataSize();
+      } catch (IOException e) {
+        Assertions.fail();
+        return false;
+      }
+    }, 1000, 10000);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return Objects.isNull(omKeyInfoTable.get(newKey));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }, 1000, 10000);
+
+    // Check whether newly created snapshot data is reclaimed
+    client.getObjectStore()
+        .deleteSnapshot(volumeName, bucketName, newSnapshot.getName());
+    GenericTestUtils.waitFor(() -> {
+      Table<String, SnapshotInfo> snapshotInfoTable =
+          newLeaderOM.getMetadataManager().getSnapshotInfoTable();
+      try {
+        return null == snapshotInfoTable.get(newSnapshot.getTableKey());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }, 1000, 10000);
+
+    // Check whether compaction logs get appeneded to by forcning compaction
+    newLeaderOM.getMetadataManager()
+        .getStore()
+        .compactDB();
+    int newNumberOfLogFiles = 0;
+    long newContentLength;
+    try (BufferedReader bufferedReader =
+             Files.newBufferedReader(currentCompactionLog);
+         DirectoryStream<Path> files =
+             Files.newDirectoryStream(compactionLogPath)) {
+      newContentLength = bufferedReader.lines()
+          .mapToLong(String::length)
+          .reduce(0L, Long::sum);
+      for (Path ignored : files) {
+        newNumberOfLogFiles++;
+      }
+    }
+    Assertions.assertTrue(numberOfLogFiles != newNumberOfLogFiles
+        || contentLength != newContentLength);
+
+    // Check whether compaction backup files were pruned
+    final int finalNumberOfSstFiles = numberOfSstFiles;
+    GenericTestUtils.waitFor(() -> {
+      int newNumberOfSstFiles = 0;
+      try (DirectoryStream<Path> files =
+               Files.newDirectoryStream(sstBackupDirPath)) {
+        for (Path ignored : files) {
+          newNumberOfSstFiles++;
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      return finalNumberOfSstFiles > newNumberOfSstFiles;
+    }, 1000, 10000);
+
+    // Snap diff
+    String firstSnapshot = createOzoneSnapshot(newLeaderOM,
+        snapshotNamePrefix + RandomStringUtils.randomNumeric(10)).getName();
+    String diffKey = writeKeys(1).get(0);
+    String secondSnapshot = createOzoneSnapshot(newLeaderOM,
+        snapshotNamePrefix + RandomStringUtils.randomNumeric(10)).getName();
+    SnapshotDiffReportOzone diff = getSnapDiffReport(volumeName, bucketName,
+        firstSnapshot, secondSnapshot);
+    Assertions.assertEquals(Collections.singletonList(
+            SnapshotDiffReportOzone.getDiffReportEntry(
+                SnapshotDiffReport.DiffType.CREATE, diffKey, null)),
+        diff.getDiffList());
+  }
+
+  private long getUsedBytes(OzoneManager newLeaderOM) throws IOException {
+    return newLeaderOM
+        .getBucketManager()
+        .getBucketInfo(ozoneBucket.getVolumeName(), ozoneBucket.getName())
+        .getUsedBytes();
+  }
+
+  private SnapshotDiffReportOzone getSnapDiffReport(String volume,

Review Comment:
   nit: alignment is bit off.
   
   ```suggestion
     private SnapshotDiffReportOzone getSnapDiffReport(String volume,
                                                       String bucket,
                                                       String fromSnapshot,
                                                       String toSnapshot)
         throws InterruptedException, TimeoutException {
   ```
   
   Or 
   
   ```
     private SnapshotDiffReportOzone getSnapDiffReport(
         String volume,
         String bucket,
         String fromSnapshot,
         String toSnapshot
     ) throws InterruptedException, TimeoutException {
   ```
   
   FYI, you can setup [Ozone 
style](https://github.com/apache/ozone/blob/master/hadoop-ozone/dev-support/intellij/ozone-style.xml)
 in IDE. 



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java:
##########
@@ -1036,6 +1069,272 @@ public void testInstallCorruptedCheckpointFailure() 
throws Exception {
     assertLogCapture(logCapture, msg);
   }
 
+  @Test
+  @DisplayName("testSnapshotBackgroundServices")
+  @SuppressWarnings("methodlength")
+  public void testSnapshotBackgroundServices()
+      throws Exception {
+    // Get the leader OM
+    String leaderOMNodeId = OmFailoverProxyUtil
+        .getFailoverProxyProvider(objectStore.getClientProxy())
+        .getCurrentProxyOMNodeId();
+    OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+
+    // Find the inactive OM
+    String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
+    if (cluster.isOMActive(followerNodeId)) {
+      followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
+    }
+    OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+
+    // Create some snapshots, each with new keys
+    int keyIncrement = 10;
+    String snapshotNamePrefix = "snapshot";
+    String snapshotName = "";
+    List<String> keys = new ArrayList<>();
+    SnapshotInfo snapshotInfo = null;
+    for (int snapshotCount = 0; snapshotCount < 10;
+         snapshotCount++) {
+      snapshotName = snapshotNamePrefix + snapshotCount;
+      keys = writeKeys(keyIncrement);
+      snapshotInfo = createOzoneSnapshot(leaderOM, snapshotName);
+    }
+
+    // Prepare baseline data for compaction backup pruning
+    String sstBackupDir = leaderOM
+        .getMetadataManager()
+        .getStore()
+        .getRocksDBCheckpointDiffer()
+        .getSSTBackupDir();
+    Assertions.assertNotNull(sstBackupDir);
+    Path sstBackupDirPath = Paths.get(sstBackupDir);
+    int numberOfSstFiles = 0;
+    try (DirectoryStream<Path> files =
+             Files.newDirectoryStream(sstBackupDirPath)) {
+      for (Path ignored : files) {
+        numberOfSstFiles++;
+      }
+    }
+
+    // Get the latest db checkpoint from the leader OM.
+    TransactionInfo transactionInfo =
+        TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager());
+    TermIndex leaderOMTermIndex =
+        TermIndex.valueOf(transactionInfo.getTerm(),
+            transactionInfo.getTransactionIndex());
+    long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
+
+    // Start the inactive OM. Checkpoint installation will happen 
spontaneously.
+    cluster.startInactiveOM(followerNodeId);
+
+    // The recently started OM should be lagging behind the leader OM.
+    // Wait & for follower to update transactions to leader snapshot index.
+    // Timeout error if follower does not load update within 10s
+    GenericTestUtils.waitFor(() ->
+        followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
+            >= leaderOMSnapshotIndex - 1, 100, 10000);
+
+

Review Comment:
   nit: please remove the extra line.



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java:
##########
@@ -1036,6 +1069,272 @@ public void testInstallCorruptedCheckpointFailure() 
throws Exception {
     assertLogCapture(logCapture, msg);
   }
 
+  @Test
+  @DisplayName("testSnapshotBackgroundServices")
+  @SuppressWarnings("methodlength")
+  public void testSnapshotBackgroundServices()
+      throws Exception {
+    // Get the leader OM
+    String leaderOMNodeId = OmFailoverProxyUtil
+        .getFailoverProxyProvider(objectStore.getClientProxy())
+        .getCurrentProxyOMNodeId();
+    OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+
+    // Find the inactive OM
+    String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
+    if (cluster.isOMActive(followerNodeId)) {
+      followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
+    }
+    OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+
+    // Create some snapshots, each with new keys
+    int keyIncrement = 10;
+    String snapshotNamePrefix = "snapshot";
+    String snapshotName = "";
+    List<String> keys = new ArrayList<>();
+    SnapshotInfo snapshotInfo = null;
+    for (int snapshotCount = 0; snapshotCount < 10;
+         snapshotCount++) {
+      snapshotName = snapshotNamePrefix + snapshotCount;
+      keys = writeKeys(keyIncrement);
+      snapshotInfo = createOzoneSnapshot(leaderOM, snapshotName);
+    }
+
+    // Prepare baseline data for compaction backup pruning
+    String sstBackupDir = leaderOM
+        .getMetadataManager()
+        .getStore()
+        .getRocksDBCheckpointDiffer()
+        .getSSTBackupDir();
+    Assertions.assertNotNull(sstBackupDir);
+    Path sstBackupDirPath = Paths.get(sstBackupDir);
+    int numberOfSstFiles = 0;
+    try (DirectoryStream<Path> files =
+             Files.newDirectoryStream(sstBackupDirPath)) {
+      for (Path ignored : files) {
+        numberOfSstFiles++;
+      }
+    }
+
+    // Get the latest db checkpoint from the leader OM.
+    TransactionInfo transactionInfo =
+        TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager());
+    TermIndex leaderOMTermIndex =
+        TermIndex.valueOf(transactionInfo.getTerm(),
+            transactionInfo.getTransactionIndex());
+    long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
+
+    // Start the inactive OM. Checkpoint installation will happen 
spontaneously.
+    cluster.startInactiveOM(followerNodeId);
+
+    // The recently started OM should be lagging behind the leader OM.
+    // Wait & for follower to update transactions to leader snapshot index.
+    // Timeout error if follower does not load update within 10s
+    GenericTestUtils.waitFor(() ->
+        followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
+            >= leaderOMSnapshotIndex - 1, 100, 10000);
+
+
+    // Verify RPC server is running
+    GenericTestUtils.waitFor(followerOM::isOmRpcServerRunning, 100, 5000);
+
+    // Read & Write after snapshot installed.
+    List<String> newKeys = writeKeys(1);
+    readKeys(newKeys);
+
+    checkSnapshot(leaderOM, followerOM, snapshotName, keys, snapshotInfo);
+
+    // verify that the bootstrap Follower OM can become leader
+    leaderOM.transferLeadership(followerNodeId);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        followerOM.checkLeaderStatus();
+        return true;
+      } catch (OMNotLeaderException | OMLeaderNotReadyException e) {
+        return false;
+      }
+    }, 1000, 10000);
+    OzoneManager newLeaderOM = cluster.getOMLeader();
+    Assertions.assertEquals(followerOM, newLeaderOM);
+    OzoneManager newFollowerOM =
+        cluster.getOzoneManager(leaderOM.getOMNodeId());
+    Assertions.assertEquals(leaderOM, newFollowerOM);
+
+    checkSnapshot(newLeaderOM, newFollowerOM, snapshotName, keys, 
snapshotInfo);
+    readKeys(newKeys);
+
+    // Prepare baseline data for compaction logs
+    String currentCompactionLogPath = newLeaderOM
+        .getMetadataManager()
+        .getStore()
+        .getRocksDBCheckpointDiffer()
+        .getCurrentCompactionLogPath();
+    int lastIndex = currentCompactionLogPath.lastIndexOf(OM_KEY_PREFIX);
+    String compactionLogsPath = currentCompactionLogPath
+        .substring(0, lastIndex);
+    int numberOfLogFiles = 0;
+    long contentLength;
+    Path compactionLogPath = Paths.get(compactionLogsPath);
+    Path currentCompactionLog = Paths.get(currentCompactionLogPath);
+    try (BufferedReader bufferedReader =
+             Files.newBufferedReader(currentCompactionLog);
+         DirectoryStream<Path> files =
+             Files.newDirectoryStream(compactionLogPath)) {
+      contentLength = bufferedReader.lines()
+          .mapToLong(String::length)
+          .reduce(0L, Long::sum);
+      for (Path ignored : files) {
+        numberOfLogFiles++;
+      }
+    }
+
+    // Check whether newly created snapshot gets processed by SFS
+    newKeys = writeKeys(1);
+    SnapshotInfo newSnapshot = createOzoneSnapshot(newLeaderOM,
+        snapshotNamePrefix + RandomStringUtils.randomNumeric(5));
+    Assertions.assertNotNull(newSnapshot);
+    File omMetadataDir =
+        OMStorage.getOmDbDir(newLeaderOM.getConfiguration());
+    String snapshotDir = omMetadataDir + OM_KEY_PREFIX + OM_SNAPSHOT_DIR;
+    Path filePath =
+        Paths.get(snapshotDir + OM_KEY_PREFIX + FILTERED_SNAPSHOTS);
+    Assertions.assertTrue(Files.exists(filePath));
+    GenericTestUtils.waitFor(() -> {
+      List<String> processedSnapshotIds;
+      try {
+        processedSnapshotIds = Files.readAllLines(filePath);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      return processedSnapshotIds.contains(newSnapshot.getSnapshotId()
+          .toString());
+    }, 1000, 30000);
+
+    // Check whether newly created keys data is reclaimed
+    String newKey = OM_KEY_PREFIX + ozoneBucket.getVolumeName() +
+        OM_KEY_PREFIX + ozoneBucket.getName() +
+        OM_KEY_PREFIX + newKeys.get(0);
+    Table<String, OmKeyInfo> omKeyInfoTable = newLeaderOM
+        .getMetadataManager()
+        .getKeyTable(ozoneBucket.getBucketLayout());
+    OmKeyInfo newKeyInfo = omKeyInfoTable.get(newKey);
+    Assertions.assertNotNull(newKeyInfo);
+
+    long usedBytes = getUsedBytes(newLeaderOM);
+
+    ozoneBucket.deleteKeys(newKeys);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return getUsedBytes(newLeaderOM) ==
+            usedBytes - newKeyInfo.getDataSize();
+      } catch (IOException e) {
+        Assertions.fail();
+        return false;
+      }
+    }, 1000, 10000);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return Objects.isNull(omKeyInfoTable.get(newKey));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }, 1000, 10000);
+
+    // Check whether newly created snapshot data is reclaimed
+    client.getObjectStore()
+        .deleteSnapshot(volumeName, bucketName, newSnapshot.getName());
+    GenericTestUtils.waitFor(() -> {
+      Table<String, SnapshotInfo> snapshotInfoTable =
+          newLeaderOM.getMetadataManager().getSnapshotInfoTable();
+      try {
+        return null == snapshotInfoTable.get(newSnapshot.getTableKey());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }, 1000, 10000);
+
+    // Check whether compaction logs get appeneded to by forcning compaction
+    newLeaderOM.getMetadataManager()
+        .getStore()
+        .compactDB();
+    int newNumberOfLogFiles = 0;
+    long newContentLength;
+    try (BufferedReader bufferedReader =
+             Files.newBufferedReader(currentCompactionLog);
+         DirectoryStream<Path> files =
+             Files.newDirectoryStream(compactionLogPath)) {
+      newContentLength = bufferedReader.lines()
+          .mapToLong(String::length)
+          .reduce(0L, Long::sum);
+      for (Path ignored : files) {
+        newNumberOfLogFiles++;
+      }
+    }
+    Assertions.assertTrue(numberOfLogFiles != newNumberOfLogFiles
+        || contentLength != newContentLength);
+
+    // Check whether compaction backup files were pruned
+    final int finalNumberOfSstFiles = numberOfSstFiles;
+    GenericTestUtils.waitFor(() -> {
+      int newNumberOfSstFiles = 0;
+      try (DirectoryStream<Path> files =
+               Files.newDirectoryStream(sstBackupDirPath)) {
+        for (Path ignored : files) {
+          newNumberOfSstFiles++;
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      return finalNumberOfSstFiles > newNumberOfSstFiles;
+    }, 1000, 10000);
+
+    // Snap diff
+    String firstSnapshot = createOzoneSnapshot(newLeaderOM,
+        snapshotNamePrefix + RandomStringUtils.randomNumeric(10)).getName();
+    String diffKey = writeKeys(1).get(0);
+    String secondSnapshot = createOzoneSnapshot(newLeaderOM,
+        snapshotNamePrefix + RandomStringUtils.randomNumeric(10)).getName();
+    SnapshotDiffReportOzone diff = getSnapDiffReport(volumeName, bucketName,
+        firstSnapshot, secondSnapshot);
+    Assertions.assertEquals(Collections.singletonList(
+            SnapshotDiffReportOzone.getDiffReportEntry(
+                SnapshotDiffReport.DiffType.CREATE, diffKey, null)),
+        diff.getDiffList());
+  }
+
+  private long getUsedBytes(OzoneManager newLeaderOM) throws IOException {

Review Comment:
   nit: just call it `leaderOm` or `ozoneManager`.
   ```suggestion
     private long getUsedBytes(OzoneManager leaderOm) throws IOException {
   ```



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java:
##########
@@ -1036,6 +1069,272 @@ public void testInstallCorruptedCheckpointFailure() 
throws Exception {
     assertLogCapture(logCapture, msg);
   }
 
+  @Test
+  @DisplayName("testSnapshotBackgroundServices")
+  @SuppressWarnings("methodlength")
+  public void testSnapshotBackgroundServices()
+      throws Exception {
+    // Get the leader OM
+    String leaderOMNodeId = OmFailoverProxyUtil
+        .getFailoverProxyProvider(objectStore.getClientProxy())
+        .getCurrentProxyOMNodeId();
+    OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+
+    // Find the inactive OM
+    String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
+    if (cluster.isOMActive(followerNodeId)) {
+      followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
+    }
+    OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+
+    // Create some snapshots, each with new keys
+    int keyIncrement = 10;
+    String snapshotNamePrefix = "snapshot";
+    String snapshotName = "";
+    List<String> keys = new ArrayList<>();
+    SnapshotInfo snapshotInfo = null;
+    for (int snapshotCount = 0; snapshotCount < 10;
+         snapshotCount++) {
+      snapshotName = snapshotNamePrefix + snapshotCount;
+      keys = writeKeys(keyIncrement);
+      snapshotInfo = createOzoneSnapshot(leaderOM, snapshotName);
+    }
+
+    // Prepare baseline data for compaction backup pruning
+    String sstBackupDir = leaderOM
+        .getMetadataManager()
+        .getStore()
+        .getRocksDBCheckpointDiffer()
+        .getSSTBackupDir();
+    Assertions.assertNotNull(sstBackupDir);
+    Path sstBackupDirPath = Paths.get(sstBackupDir);
+    int numberOfSstFiles = 0;
+    try (DirectoryStream<Path> files =
+             Files.newDirectoryStream(sstBackupDirPath)) {
+      for (Path ignored : files) {
+        numberOfSstFiles++;
+      }
+    }
+
+    // Get the latest db checkpoint from the leader OM.
+    TransactionInfo transactionInfo =
+        TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager());
+    TermIndex leaderOMTermIndex =
+        TermIndex.valueOf(transactionInfo.getTerm(),
+            transactionInfo.getTransactionIndex());
+    long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
+
+    // Start the inactive OM. Checkpoint installation will happen 
spontaneously.
+    cluster.startInactiveOM(followerNodeId);
+
+    // The recently started OM should be lagging behind the leader OM.
+    // Wait & for follower to update transactions to leader snapshot index.
+    // Timeout error if follower does not load update within 10s
+    GenericTestUtils.waitFor(() ->
+        followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
+            >= leaderOMSnapshotIndex - 1, 100, 10000);
+
+
+    // Verify RPC server is running
+    GenericTestUtils.waitFor(followerOM::isOmRpcServerRunning, 100, 5000);
+
+    // Read & Write after snapshot installed.
+    List<String> newKeys = writeKeys(1);
+    readKeys(newKeys);
+
+    checkSnapshot(leaderOM, followerOM, snapshotName, keys, snapshotInfo);
+
+    // verify that the bootstrap Follower OM can become leader
+    leaderOM.transferLeadership(followerNodeId);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        followerOM.checkLeaderStatus();
+        return true;
+      } catch (OMNotLeaderException | OMLeaderNotReadyException e) {
+        return false;
+      }
+    }, 1000, 10000);
+    OzoneManager newLeaderOM = cluster.getOMLeader();
+    Assertions.assertEquals(followerOM, newLeaderOM);
+    OzoneManager newFollowerOM =
+        cluster.getOzoneManager(leaderOM.getOMNodeId());
+    Assertions.assertEquals(leaderOM, newFollowerOM);
+
+    checkSnapshot(newLeaderOM, newFollowerOM, snapshotName, keys, 
snapshotInfo);
+    readKeys(newKeys);
+
+    // Prepare baseline data for compaction logs
+    String currentCompactionLogPath = newLeaderOM

Review Comment:
   Shouldn't this be after line 1117 just after you are doing baseline of SST 
files in backDir?



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java:
##########
@@ -1036,6 +1069,272 @@ public void testInstallCorruptedCheckpointFailure() 
throws Exception {
     assertLogCapture(logCapture, msg);
   }
 
+  @Test
+  @DisplayName("testSnapshotBackgroundServices")
+  @SuppressWarnings("methodlength")
+  public void testSnapshotBackgroundServices()
+      throws Exception {
+    // Get the leader OM
+    String leaderOMNodeId = OmFailoverProxyUtil
+        .getFailoverProxyProvider(objectStore.getClientProxy())
+        .getCurrentProxyOMNodeId();
+    OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+
+    // Find the inactive OM
+    String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
+    if (cluster.isOMActive(followerNodeId)) {
+      followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
+    }
+    OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+
+    // Create some snapshots, each with new keys
+    int keyIncrement = 10;
+    String snapshotNamePrefix = "snapshot";
+    String snapshotName = "";
+    List<String> keys = new ArrayList<>();
+    SnapshotInfo snapshotInfo = null;
+    for (int snapshotCount = 0; snapshotCount < 10;
+         snapshotCount++) {
+      snapshotName = snapshotNamePrefix + snapshotCount;
+      keys = writeKeys(keyIncrement);
+      snapshotInfo = createOzoneSnapshot(leaderOM, snapshotName);
+    }
+
+    // Prepare baseline data for compaction backup pruning
+    String sstBackupDir = leaderOM
+        .getMetadataManager()
+        .getStore()
+        .getRocksDBCheckpointDiffer()
+        .getSSTBackupDir();
+    Assertions.assertNotNull(sstBackupDir);
+    Path sstBackupDirPath = Paths.get(sstBackupDir);
+    int numberOfSstFiles = 0;
+    try (DirectoryStream<Path> files =
+             Files.newDirectoryStream(sstBackupDirPath)) {
+      for (Path ignored : files) {
+        numberOfSstFiles++;
+      }
+    }
+
+    // Get the latest db checkpoint from the leader OM.
+    TransactionInfo transactionInfo =
+        TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager());
+    TermIndex leaderOMTermIndex =
+        TermIndex.valueOf(transactionInfo.getTerm(),
+            transactionInfo.getTransactionIndex());
+    long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
+
+    // Start the inactive OM. Checkpoint installation will happen 
spontaneously.
+    cluster.startInactiveOM(followerNodeId);
+
+    // The recently started OM should be lagging behind the leader OM.
+    // Wait & for follower to update transactions to leader snapshot index.
+    // Timeout error if follower does not load update within 10s
+    GenericTestUtils.waitFor(() ->
+        followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
+            >= leaderOMSnapshotIndex - 1, 100, 10000);
+
+
+    // Verify RPC server is running
+    GenericTestUtils.waitFor(followerOM::isOmRpcServerRunning, 100, 5000);
+
+    // Read & Write after snapshot installed.
+    List<String> newKeys = writeKeys(1);
+    readKeys(newKeys);
+
+    checkSnapshot(leaderOM, followerOM, snapshotName, keys, snapshotInfo);
+
+    // verify that the bootstrap Follower OM can become leader
+    leaderOM.transferLeadership(followerNodeId);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        followerOM.checkLeaderStatus();
+        return true;
+      } catch (OMNotLeaderException | OMLeaderNotReadyException e) {
+        return false;
+      }
+    }, 1000, 10000);
+    OzoneManager newLeaderOM = cluster.getOMLeader();
+    Assertions.assertEquals(followerOM, newLeaderOM);
+    OzoneManager newFollowerOM =
+        cluster.getOzoneManager(leaderOM.getOMNodeId());
+    Assertions.assertEquals(leaderOM, newFollowerOM);
+
+    checkSnapshot(newLeaderOM, newFollowerOM, snapshotName, keys, 
snapshotInfo);
+    readKeys(newKeys);
+
+    // Prepare baseline data for compaction logs
+    String currentCompactionLogPath = newLeaderOM
+        .getMetadataManager()
+        .getStore()
+        .getRocksDBCheckpointDiffer()
+        .getCurrentCompactionLogPath();
+    int lastIndex = currentCompactionLogPath.lastIndexOf(OM_KEY_PREFIX);
+    String compactionLogsPath = currentCompactionLogPath
+        .substring(0, lastIndex);
+    int numberOfLogFiles = 0;
+    long contentLength;
+    Path compactionLogPath = Paths.get(compactionLogsPath);
+    Path currentCompactionLog = Paths.get(currentCompactionLogPath);
+    try (BufferedReader bufferedReader =
+             Files.newBufferedReader(currentCompactionLog);
+         DirectoryStream<Path> files =
+             Files.newDirectoryStream(compactionLogPath)) {
+      contentLength = bufferedReader.lines()
+          .mapToLong(String::length)
+          .reduce(0L, Long::sum);
+      for (Path ignored : files) {
+        numberOfLogFiles++;
+      }
+    }
+
+    // Check whether newly created snapshot gets processed by SFS
+    newKeys = writeKeys(1);
+    SnapshotInfo newSnapshot = createOzoneSnapshot(newLeaderOM,
+        snapshotNamePrefix + RandomStringUtils.randomNumeric(5));
+    Assertions.assertNotNull(newSnapshot);
+    File omMetadataDir =
+        OMStorage.getOmDbDir(newLeaderOM.getConfiguration());
+    String snapshotDir = omMetadataDir + OM_KEY_PREFIX + OM_SNAPSHOT_DIR;
+    Path filePath =
+        Paths.get(snapshotDir + OM_KEY_PREFIX + FILTERED_SNAPSHOTS);
+    Assertions.assertTrue(Files.exists(filePath));
+    GenericTestUtils.waitFor(() -> {
+      List<String> processedSnapshotIds;
+      try {
+        processedSnapshotIds = Files.readAllLines(filePath);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      return processedSnapshotIds.contains(newSnapshot.getSnapshotId()
+          .toString());
+    }, 1000, 30000);
+
+    // Check whether newly created keys data is reclaimed
+    String newKey = OM_KEY_PREFIX + ozoneBucket.getVolumeName() +
+        OM_KEY_PREFIX + ozoneBucket.getName() +
+        OM_KEY_PREFIX + newKeys.get(0);
+    Table<String, OmKeyInfo> omKeyInfoTable = newLeaderOM
+        .getMetadataManager()
+        .getKeyTable(ozoneBucket.getBucketLayout());
+    OmKeyInfo newKeyInfo = omKeyInfoTable.get(newKey);
+    Assertions.assertNotNull(newKeyInfo);
+
+    long usedBytes = getUsedBytes(newLeaderOM);
+
+    ozoneBucket.deleteKeys(newKeys);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return getUsedBytes(newLeaderOM) ==
+            usedBytes - newKeyInfo.getDataSize();
+      } catch (IOException e) {
+        Assertions.fail();
+        return false;
+      }
+    }, 1000, 10000);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return Objects.isNull(omKeyInfoTable.get(newKey));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }, 1000, 10000);
+
+    // Check whether newly created snapshot data is reclaimed
+    client.getObjectStore()
+        .deleteSnapshot(volumeName, bucketName, newSnapshot.getName());
+    GenericTestUtils.waitFor(() -> {
+      Table<String, SnapshotInfo> snapshotInfoTable =
+          newLeaderOM.getMetadataManager().getSnapshotInfoTable();
+      try {
+        return null == snapshotInfoTable.get(newSnapshot.getTableKey());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }, 1000, 10000);
+
+    // Check whether compaction logs get appeneded to by forcning compaction
+    newLeaderOM.getMetadataManager()
+        .getStore()
+        .compactDB();
+    int newNumberOfLogFiles = 0;
+    long newContentLength;
+    try (BufferedReader bufferedReader =
+             Files.newBufferedReader(currentCompactionLog);
+         DirectoryStream<Path> files =
+             Files.newDirectoryStream(compactionLogPath)) {
+      newContentLength = bufferedReader.lines()
+          .mapToLong(String::length)
+          .reduce(0L, Long::sum);
+      for (Path ignored : files) {
+        newNumberOfLogFiles++;
+      }
+    }
+    Assertions.assertTrue(numberOfLogFiles != newNumberOfLogFiles
+        || contentLength != newContentLength);
+
+    // Check whether compaction backup files were pruned
+    final int finalNumberOfSstFiles = numberOfSstFiles;
+    GenericTestUtils.waitFor(() -> {
+      int newNumberOfSstFiles = 0;
+      try (DirectoryStream<Path> files =
+               Files.newDirectoryStream(sstBackupDirPath)) {
+        for (Path ignored : files) {
+          newNumberOfSstFiles++;
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      return finalNumberOfSstFiles > newNumberOfSstFiles;
+    }, 1000, 10000);
+
+    // Snap diff
+    String firstSnapshot = createOzoneSnapshot(newLeaderOM,
+        snapshotNamePrefix + RandomStringUtils.randomNumeric(10)).getName();
+    String diffKey = writeKeys(1).get(0);
+    String secondSnapshot = createOzoneSnapshot(newLeaderOM,
+        snapshotNamePrefix + RandomStringUtils.randomNumeric(10)).getName();
+    SnapshotDiffReportOzone diff = getSnapDiffReport(volumeName, bucketName,
+        firstSnapshot, secondSnapshot);
+    Assertions.assertEquals(Collections.singletonList(
+            SnapshotDiffReportOzone.getDiffReportEntry(
+                SnapshotDiffReport.DiffType.CREATE, diffKey, null)),
+        diff.getDiffList());
+  }
+
+  private long getUsedBytes(OzoneManager newLeaderOM) throws IOException {
+    return newLeaderOM
+        .getBucketManager()
+        .getBucketInfo(ozoneBucket.getVolumeName(), ozoneBucket.getName())
+        .getUsedBytes();
+  }
+
+  private SnapshotDiffReportOzone getSnapDiffReport(String volume,
+      String bucket,
+      String fromSnapshot,
+      String toSnapshot)
+      throws InterruptedException, TimeoutException {
+    AtomicReference<SnapshotDiffResponse> response = new AtomicReference<>();
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        response.set(client.getObjectStore()
+            .snapshotDiff(
+                volume, bucket, fromSnapshot, toSnapshot, null, 0, false,
+                false));
+        return response.get().getJobStatus() == DONE;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }, 1000, 10000);

Review Comment:
   nit: You can also use `SnapshotDiffResponse`'s waitTime for polling.



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java:
##########
@@ -1036,6 +1069,272 @@ public void testInstallCorruptedCheckpointFailure() 
throws Exception {
     assertLogCapture(logCapture, msg);
   }
 
+  @Test
+  @DisplayName("testSnapshotBackgroundServices")
+  @SuppressWarnings("methodlength")
+  public void testSnapshotBackgroundServices()
+      throws Exception {
+    // Get the leader OM
+    String leaderOMNodeId = OmFailoverProxyUtil
+        .getFailoverProxyProvider(objectStore.getClientProxy())
+        .getCurrentProxyOMNodeId();
+    OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+
+    // Find the inactive OM
+    String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
+    if (cluster.isOMActive(followerNodeId)) {
+      followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
+    }
+    OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+
+    // Create some snapshots, each with new keys
+    int keyIncrement = 10;
+    String snapshotNamePrefix = "snapshot";
+    String snapshotName = "";
+    List<String> keys = new ArrayList<>();
+    SnapshotInfo snapshotInfo = null;
+    for (int snapshotCount = 0; snapshotCount < 10;
+         snapshotCount++) {
+      snapshotName = snapshotNamePrefix + snapshotCount;
+      keys = writeKeys(keyIncrement);
+      snapshotInfo = createOzoneSnapshot(leaderOM, snapshotName);
+    }
+
+    // Prepare baseline data for compaction backup pruning
+    String sstBackupDir = leaderOM
+        .getMetadataManager()
+        .getStore()
+        .getRocksDBCheckpointDiffer()
+        .getSSTBackupDir();
+    Assertions.assertNotNull(sstBackupDir);
+    Path sstBackupDirPath = Paths.get(sstBackupDir);
+    int numberOfSstFiles = 0;
+    try (DirectoryStream<Path> files =
+             Files.newDirectoryStream(sstBackupDirPath)) {
+      for (Path ignored : files) {
+        numberOfSstFiles++;
+      }
+    }
+
+    // Get the latest db checkpoint from the leader OM.
+    TransactionInfo transactionInfo =
+        TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager());
+    TermIndex leaderOMTermIndex =
+        TermIndex.valueOf(transactionInfo.getTerm(),
+            transactionInfo.getTransactionIndex());
+    long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
+
+    // Start the inactive OM. Checkpoint installation will happen 
spontaneously.
+    cluster.startInactiveOM(followerNodeId);
+
+    // The recently started OM should be lagging behind the leader OM.
+    // Wait & for follower to update transactions to leader snapshot index.
+    // Timeout error if follower does not load update within 10s
+    GenericTestUtils.waitFor(() ->
+        followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
+            >= leaderOMSnapshotIndex - 1, 100, 10000);
+
+
+    // Verify RPC server is running
+    GenericTestUtils.waitFor(followerOM::isOmRpcServerRunning, 100, 5000);
+
+    // Read & Write after snapshot installed.
+    List<String> newKeys = writeKeys(1);
+    readKeys(newKeys);
+
+    checkSnapshot(leaderOM, followerOM, snapshotName, keys, snapshotInfo);
+
+    // verify that the bootstrap Follower OM can become leader
+    leaderOM.transferLeadership(followerNodeId);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        followerOM.checkLeaderStatus();
+        return true;
+      } catch (OMNotLeaderException | OMLeaderNotReadyException e) {
+        return false;
+      }
+    }, 1000, 10000);
+    OzoneManager newLeaderOM = cluster.getOMLeader();
+    Assertions.assertEquals(followerOM, newLeaderOM);
+    OzoneManager newFollowerOM =
+        cluster.getOzoneManager(leaderOM.getOMNodeId());
+    Assertions.assertEquals(leaderOM, newFollowerOM);
+
+    checkSnapshot(newLeaderOM, newFollowerOM, snapshotName, keys, 
snapshotInfo);
+    readKeys(newKeys);
+
+    // Prepare baseline data for compaction logs
+    String currentCompactionLogPath = newLeaderOM
+        .getMetadataManager()
+        .getStore()
+        .getRocksDBCheckpointDiffer()
+        .getCurrentCompactionLogPath();
+    int lastIndex = currentCompactionLogPath.lastIndexOf(OM_KEY_PREFIX);
+    String compactionLogsPath = currentCompactionLogPath
+        .substring(0, lastIndex);
+    int numberOfLogFiles = 0;
+    long contentLength;
+    Path compactionLogPath = Paths.get(compactionLogsPath);
+    Path currentCompactionLog = Paths.get(currentCompactionLogPath);
+    try (BufferedReader bufferedReader =
+             Files.newBufferedReader(currentCompactionLog);
+         DirectoryStream<Path> files =
+             Files.newDirectoryStream(compactionLogPath)) {
+      contentLength = bufferedReader.lines()
+          .mapToLong(String::length)
+          .reduce(0L, Long::sum);
+      for (Path ignored : files) {
+        numberOfLogFiles++;
+      }
+    }
+
+    // Check whether newly created snapshot gets processed by SFS
+    newKeys = writeKeys(1);
+    SnapshotInfo newSnapshot = createOzoneSnapshot(newLeaderOM,
+        snapshotNamePrefix + RandomStringUtils.randomNumeric(5));
+    Assertions.assertNotNull(newSnapshot);
+    File omMetadataDir =
+        OMStorage.getOmDbDir(newLeaderOM.getConfiguration());
+    String snapshotDir = omMetadataDir + OM_KEY_PREFIX + OM_SNAPSHOT_DIR;
+    Path filePath =
+        Paths.get(snapshotDir + OM_KEY_PREFIX + FILTERED_SNAPSHOTS);
+    Assertions.assertTrue(Files.exists(filePath));
+    GenericTestUtils.waitFor(() -> {
+      List<String> processedSnapshotIds;
+      try {
+        processedSnapshotIds = Files.readAllLines(filePath);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      return processedSnapshotIds.contains(newSnapshot.getSnapshotId()
+          .toString());
+    }, 1000, 30000);
+
+    // Check whether newly created keys data is reclaimed
+    String newKey = OM_KEY_PREFIX + ozoneBucket.getVolumeName() +
+        OM_KEY_PREFIX + ozoneBucket.getName() +
+        OM_KEY_PREFIX + newKeys.get(0);
+    Table<String, OmKeyInfo> omKeyInfoTable = newLeaderOM
+        .getMetadataManager()
+        .getKeyTable(ozoneBucket.getBucketLayout());
+    OmKeyInfo newKeyInfo = omKeyInfoTable.get(newKey);
+    Assertions.assertNotNull(newKeyInfo);
+
+    long usedBytes = getUsedBytes(newLeaderOM);
+
+    ozoneBucket.deleteKeys(newKeys);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return getUsedBytes(newLeaderOM) ==
+            usedBytes - newKeyInfo.getDataSize();
+      } catch (IOException e) {
+        Assertions.fail();
+        return false;
+      }
+    }, 1000, 10000);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return Objects.isNull(omKeyInfoTable.get(newKey));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }, 1000, 10000);
+
+    // Check whether newly created snapshot data is reclaimed
+    client.getObjectStore()
+        .deleteSnapshot(volumeName, bucketName, newSnapshot.getName());
+    GenericTestUtils.waitFor(() -> {
+      Table<String, SnapshotInfo> snapshotInfoTable =
+          newLeaderOM.getMetadataManager().getSnapshotInfoTable();
+      try {
+        return null == snapshotInfoTable.get(newSnapshot.getTableKey());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }, 1000, 10000);
+
+    // Check whether compaction logs get appeneded to by forcning compaction
+    newLeaderOM.getMetadataManager()
+        .getStore()
+        .compactDB();
+    int newNumberOfLogFiles = 0;
+    long newContentLength;
+    try (BufferedReader bufferedReader =
+             Files.newBufferedReader(currentCompactionLog);
+         DirectoryStream<Path> files =
+             Files.newDirectoryStream(compactionLogPath)) {
+      newContentLength = bufferedReader.lines()
+          .mapToLong(String::length)
+          .reduce(0L, Long::sum);
+      for (Path ignored : files) {
+        newNumberOfLogFiles++;
+      }
+    }
+    Assertions.assertTrue(numberOfLogFiles != newNumberOfLogFiles
+        || contentLength != newContentLength);
+
+    // Check whether compaction backup files were pruned
+    final int finalNumberOfSstFiles = numberOfSstFiles;
+    GenericTestUtils.waitFor(() -> {
+      int newNumberOfSstFiles = 0;
+      try (DirectoryStream<Path> files =
+               Files.newDirectoryStream(sstBackupDirPath)) {
+        for (Path ignored : files) {
+          newNumberOfSstFiles++;
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      return finalNumberOfSstFiles > newNumberOfSstFiles;
+    }, 1000, 10000);
+
+    // Snap diff
+    String firstSnapshot = createOzoneSnapshot(newLeaderOM,
+        snapshotNamePrefix + RandomStringUtils.randomNumeric(10)).getName();
+    String diffKey = writeKeys(1).get(0);
+    String secondSnapshot = createOzoneSnapshot(newLeaderOM,
+        snapshotNamePrefix + RandomStringUtils.randomNumeric(10)).getName();
+    SnapshotDiffReportOzone diff = getSnapDiffReport(volumeName, bucketName,

Review Comment:
   Do we also need to check the correctness of snapshots created before the 
bootstrapping and transfer of leadership? I don't see we are doing any 
validation of the SST files as such. Please correct me if I'm wrong.



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java:
##########
@@ -1036,6 +1069,272 @@ public void testInstallCorruptedCheckpointFailure() 
throws Exception {
     assertLogCapture(logCapture, msg);
   }
 
+  @Test
+  @DisplayName("testSnapshotBackgroundServices")
+  @SuppressWarnings("methodlength")
+  public void testSnapshotBackgroundServices()
+      throws Exception {
+    // Get the leader OM
+    String leaderOMNodeId = OmFailoverProxyUtil
+        .getFailoverProxyProvider(objectStore.getClientProxy())
+        .getCurrentProxyOMNodeId();
+    OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+
+    // Find the inactive OM
+    String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
+    if (cluster.isOMActive(followerNodeId)) {
+      followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
+    }
+    OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+
+    // Create some snapshots, each with new keys
+    int keyIncrement = 10;
+    String snapshotNamePrefix = "snapshot";
+    String snapshotName = "";
+    List<String> keys = new ArrayList<>();
+    SnapshotInfo snapshotInfo = null;
+    for (int snapshotCount = 0; snapshotCount < 10;
+         snapshotCount++) {
+      snapshotName = snapshotNamePrefix + snapshotCount;
+      keys = writeKeys(keyIncrement);
+      snapshotInfo = createOzoneSnapshot(leaderOM, snapshotName);
+    }
+
+    // Prepare baseline data for compaction backup pruning
+    String sstBackupDir = leaderOM
+        .getMetadataManager()
+        .getStore()
+        .getRocksDBCheckpointDiffer()
+        .getSSTBackupDir();
+    Assertions.assertNotNull(sstBackupDir);
+    Path sstBackupDirPath = Paths.get(sstBackupDir);
+    int numberOfSstFiles = 0;
+    try (DirectoryStream<Path> files =
+             Files.newDirectoryStream(sstBackupDirPath)) {
+      for (Path ignored : files) {
+        numberOfSstFiles++;
+      }
+    }
+
+    // Get the latest db checkpoint from the leader OM.
+    TransactionInfo transactionInfo =
+        TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager());
+    TermIndex leaderOMTermIndex =
+        TermIndex.valueOf(transactionInfo.getTerm(),
+            transactionInfo.getTransactionIndex());
+    long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
+
+    // Start the inactive OM. Checkpoint installation will happen 
spontaneously.
+    cluster.startInactiveOM(followerNodeId);
+
+    // The recently started OM should be lagging behind the leader OM.
+    // Wait & for follower to update transactions to leader snapshot index.
+    // Timeout error if follower does not load update within 10s
+    GenericTestUtils.waitFor(() ->
+        followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
+            >= leaderOMSnapshotIndex - 1, 100, 10000);
+
+
+    // Verify RPC server is running
+    GenericTestUtils.waitFor(followerOM::isOmRpcServerRunning, 100, 5000);
+
+    // Read & Write after snapshot installed.
+    List<String> newKeys = writeKeys(1);
+    readKeys(newKeys);
+
+    checkSnapshot(leaderOM, followerOM, snapshotName, keys, snapshotInfo);
+
+    // verify that the bootstrap Follower OM can become leader
+    leaderOM.transferLeadership(followerNodeId);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        followerOM.checkLeaderStatus();
+        return true;
+      } catch (OMNotLeaderException | OMLeaderNotReadyException e) {
+        return false;
+      }
+    }, 1000, 10000);
+    OzoneManager newLeaderOM = cluster.getOMLeader();
+    Assertions.assertEquals(followerOM, newLeaderOM);
+    OzoneManager newFollowerOM =
+        cluster.getOzoneManager(leaderOM.getOMNodeId());
+    Assertions.assertEquals(leaderOM, newFollowerOM);
+
+    checkSnapshot(newLeaderOM, newFollowerOM, snapshotName, keys, 
snapshotInfo);
+    readKeys(newKeys);
+
+    // Prepare baseline data for compaction logs
+    String currentCompactionLogPath = newLeaderOM
+        .getMetadataManager()
+        .getStore()
+        .getRocksDBCheckpointDiffer()
+        .getCurrentCompactionLogPath();
+    int lastIndex = currentCompactionLogPath.lastIndexOf(OM_KEY_PREFIX);
+    String compactionLogsPath = currentCompactionLogPath
+        .substring(0, lastIndex);
+    int numberOfLogFiles = 0;
+    long contentLength;
+    Path compactionLogPath = Paths.get(compactionLogsPath);
+    Path currentCompactionLog = Paths.get(currentCompactionLogPath);
+    try (BufferedReader bufferedReader =
+             Files.newBufferedReader(currentCompactionLog);
+         DirectoryStream<Path> files =
+             Files.newDirectoryStream(compactionLogPath)) {
+      contentLength = bufferedReader.lines()
+          .mapToLong(String::length)
+          .reduce(0L, Long::sum);
+      for (Path ignored : files) {
+        numberOfLogFiles++;
+      }
+    }
+
+    // Check whether newly created snapshot gets processed by SFS
+    newKeys = writeKeys(1);
+    SnapshotInfo newSnapshot = createOzoneSnapshot(newLeaderOM,
+        snapshotNamePrefix + RandomStringUtils.randomNumeric(5));
+    Assertions.assertNotNull(newSnapshot);
+    File omMetadataDir =
+        OMStorage.getOmDbDir(newLeaderOM.getConfiguration());
+    String snapshotDir = omMetadataDir + OM_KEY_PREFIX + OM_SNAPSHOT_DIR;
+    Path filePath =
+        Paths.get(snapshotDir + OM_KEY_PREFIX + FILTERED_SNAPSHOTS);
+    Assertions.assertTrue(Files.exists(filePath));
+    GenericTestUtils.waitFor(() -> {
+      List<String> processedSnapshotIds;
+      try {
+        processedSnapshotIds = Files.readAllLines(filePath);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      return processedSnapshotIds.contains(newSnapshot.getSnapshotId()
+          .toString());
+    }, 1000, 30000);
+
+    // Check whether newly created keys data is reclaimed
+    String newKey = OM_KEY_PREFIX + ozoneBucket.getVolumeName() +
+        OM_KEY_PREFIX + ozoneBucket.getName() +
+        OM_KEY_PREFIX + newKeys.get(0);
+    Table<String, OmKeyInfo> omKeyInfoTable = newLeaderOM
+        .getMetadataManager()
+        .getKeyTable(ozoneBucket.getBucketLayout());
+    OmKeyInfo newKeyInfo = omKeyInfoTable.get(newKey);
+    Assertions.assertNotNull(newKeyInfo);
+
+    long usedBytes = getUsedBytes(newLeaderOM);
+
+    ozoneBucket.deleteKeys(newKeys);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return getUsedBytes(newLeaderOM) ==
+            usedBytes - newKeyInfo.getDataSize();
+      } catch (IOException e) {
+        Assertions.fail();
+        return false;
+      }
+    }, 1000, 10000);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return Objects.isNull(omKeyInfoTable.get(newKey));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }, 1000, 10000);
+
+    // Check whether newly created snapshot data is reclaimed
+    client.getObjectStore()
+        .deleteSnapshot(volumeName, bucketName, newSnapshot.getName());
+    GenericTestUtils.waitFor(() -> {
+      Table<String, SnapshotInfo> snapshotInfoTable =
+          newLeaderOM.getMetadataManager().getSnapshotInfoTable();
+      try {
+        return null == snapshotInfoTable.get(newSnapshot.getTableKey());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }, 1000, 10000);
+
+    // Check whether compaction logs get appeneded to by forcning compaction

Review Comment:
   ```suggestion
       // Check whether compaction logs get appended to by forcing compaction
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to