mladjan-gadzic commented on code in PR #5083:
URL: https://github.com/apache/ozone/pull/5083#discussion_r1283531068


##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java:
##########
@@ -1036,6 +1075,359 @@ public void testInstallCorruptedCheckpointFailure() 
throws Exception {
     assertLogCapture(logCapture, msg);
   }
 
+  /**
+   * Goal of this test is to check whether background services work after
+   * leadership transfer.
+   * Services tested:
+   * -- SST filtering
+   * -- key deletion
+   * -- snapshot deletion
+   * -- compaction backup pruning
+   * On top of that there are some simple tests to confirm system integrity.
+   */
+  @Test
+  @DisplayName("testSnapshotBackgroundServices")
+  @SuppressWarnings("methodlength")
+  public void testSnapshotBackgroundServices()
+      throws Exception {
+    // Get the leader OM
+    String leaderOMNodeId = OmFailoverProxyUtil
+        .getFailoverProxyProvider(objectStore.getClientProxy())
+        .getCurrentProxyOMNodeId();
+    OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+
+    // Find the inactive OM
+    String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
+    if (cluster.isOMActive(followerNodeId)) {
+      followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
+    }
+    OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+
+    // Create some snapshots, each with new keys
+    int keyIncrement = 10;
+    String snapshotNamePrefix = "snapshot";
+    String snapshotName = "";
+    List<String> keys = new ArrayList<>();
+    SnapshotInfo snapshotInfo = null;
+    for (int snapshotCount = 0; snapshotCount < 10;
+         snapshotCount++) {
+      snapshotName = snapshotNamePrefix + snapshotCount;
+      keys = writeKeys(keyIncrement);
+      snapshotInfo = createOzoneSnapshot(leaderOM, snapshotName);
+    }
+
+    // Prepare baseline data for compaction backup pruning
+    String sstBackupDir = leaderOM
+        .getMetadataManager()
+        .getStore()
+        .getRocksDBCheckpointDiffer()
+        .getSSTBackupDir();
+    Assertions.assertNotNull(sstBackupDir);
+    Path sstBackupDirPath = Paths.get(sstBackupDir);
+    int numberOfSstFiles = 0;
+    try (DirectoryStream<Path> files =
+             Files.newDirectoryStream(sstBackupDirPath)) {
+      for (Path ignored : files) {
+        numberOfSstFiles++;
+      }
+    }
+
+    // Prepare baseline data for compaction logs
+    String currentCompactionLogPath = leaderOM
+        .getMetadataManager()
+        .getStore()
+        .getRocksDBCheckpointDiffer()
+        .getCurrentCompactionLogPath();
+    int lastIndex = currentCompactionLogPath.lastIndexOf(OM_KEY_PREFIX);
+    String compactionLogsPath = currentCompactionLogPath
+        .substring(0, lastIndex);
+    int numberOfLogFiles = 0;
+    long contentLength;
+    Path compactionLogPath = Paths.get(compactionLogsPath);
+    Path currentCompactionLog = Paths.get(currentCompactionLogPath);
+    try (BufferedReader bufferedReader =
+             Files.newBufferedReader(currentCompactionLog);
+         DirectoryStream<Path> files =
+             Files.newDirectoryStream(compactionLogPath)) {
+      contentLength = bufferedReader.lines()
+          .mapToLong(String::length)
+          .reduce(0L, Long::sum);
+      for (Path ignored : files) {
+        numberOfLogFiles++;
+      }
+    }
+
+    // Get the latest db checkpoint from the leader OM.
+    TransactionInfo transactionInfo =
+        TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager());
+    TermIndex leaderOMTermIndex =
+        TermIndex.valueOf(transactionInfo.getTerm(),
+            transactionInfo.getTransactionIndex());
+    long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
+
+    // Start the inactive OM. Checkpoint installation will happen 
spontaneously.
+    cluster.startInactiveOM(followerNodeId);
+
+    // The recently started OM should be lagging behind the leader OM.
+    // Wait & for follower to update transactions to leader snapshot index.
+    // Timeout error if follower does not load update within 10s
+    GenericTestUtils.waitFor(() ->
+        followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
+            >= leaderOMSnapshotIndex - 1, 100, 10000);
+
+    // Verify RPC server is running
+    GenericTestUtils.waitFor(followerOM::isOmRpcServerRunning, 100, 5000);
+
+    // Read & Write after snapshot installed.
+    List<String> newKeys = writeKeys(1);
+    readKeys(newKeys);
+
+    checkSnapshot(leaderOM, followerOM, snapshotName, keys, snapshotInfo);
+
+    // verify that the bootstrap Follower OM can become leader
+    leaderOM.transferLeadership(followerNodeId);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        followerOM.checkLeaderStatus();
+        return true;
+      } catch (OMNotLeaderException | OMLeaderNotReadyException e) {
+        return false;
+      }
+    }, 100, 10000);
+    OzoneManager newLeaderOM = cluster.getOMLeader();
+    Assertions.assertEquals(followerOM, newLeaderOM);
+    OzoneManager newFollowerOM =
+        cluster.getOzoneManager(leaderOM.getOMNodeId());
+    Assertions.assertEquals(leaderOM, newFollowerOM);
+
+    readKeys(newKeys);
+
+    // Check whether newly created snapshot gets processed by SFS
+    writeKeys(1);
+    SnapshotInfo newSnapshot = createOzoneSnapshot(newLeaderOM,
+        snapshotNamePrefix + RandomStringUtils.randomNumeric(5));
+    Assertions.assertNotNull(newSnapshot);
+    File omMetadataDir =
+        OMStorage.getOmDbDir(newLeaderOM.getConfiguration());
+    String snapshotDir = omMetadataDir + OM_KEY_PREFIX + OM_SNAPSHOT_DIR;
+    Path filePath =
+        Paths.get(snapshotDir + OM_KEY_PREFIX + FILTERED_SNAPSHOTS);
+    Assertions.assertTrue(Files.exists(filePath));
+    GenericTestUtils.waitFor(() -> {
+      List<String> processedSnapshotIds;
+      try {
+        processedSnapshotIds = Files.readAllLines(filePath);
+      } catch (IOException e) {
+        Assertions.fail();
+        return false;
+      }
+      return processedSnapshotIds.contains(newSnapshot.getSnapshotId()
+          .toString());
+    }, 1000, 30000);
+
+    /*
+      Check whether newly created key data is reclaimed
+      create key a
+      create snapshot b
+      delete key a
+      create snapshot c
+      assert that a is in c's deleted table
+      create snapshot d
+      delete snapshot c
+      wait until key a appears in deleted table of d.
+    */
+    // create key a
+    String keyNameA = writeKeys(1).get(0);
+    String keyA = OM_KEY_PREFIX + ozoneBucket.getVolumeName() +
+        OM_KEY_PREFIX + ozoneBucket.getName() +
+        OM_KEY_PREFIX + keyNameA;
+    Table<String, OmKeyInfo> omKeyInfoTable = newLeaderOM
+        .getMetadataManager()
+        .getKeyTable(ozoneBucket.getBucketLayout());
+    OmKeyInfo keyInfoA = omKeyInfoTable.get(keyA);
+    Assertions.assertNotNull(keyInfoA);
+
+    // create snapshot b
+    SnapshotInfo snapshotInfoB = createOzoneSnapshot(newLeaderOM,
+        snapshotNamePrefix + RandomStringUtils.randomNumeric(5));
+    Assertions.assertNotNull(snapshotInfoB);
+
+    // delete key a
+    ozoneBucket.deleteKey(keyNameA);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return Objects.isNull(omKeyInfoTable.get(keyA));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }, 1000, 10000);
+
+    // create snapshot c
+    SnapshotInfo snapshotInfoC = createOzoneSnapshot(newLeaderOM,
+        snapshotNamePrefix + RandomStringUtils.randomNumeric(5));
+
+    // get snapshot c
+    OmSnapshot snapC;
+    try (ReferenceCounted<IOmMetadataReader, SnapshotCache> rcC = newLeaderOM
+        .getOmSnapshotManager()
+        .checkForSnapshot(volumeName, bucketName,
+            getSnapshotPrefix(snapshotInfoC.getName()), true)) {
+      Assertions.assertNotNull(rcC);
+      snapC = (OmSnapshot) rcC.get();
+    }
+
+    // assert that key a is in snapshot c's deleted table
+    GenericTestUtils.waitFor(() -> {
+      try (TableIterator<String, ? extends Table.KeyValue<String,
+          RepeatedOmKeyInfo>> iterator =
+               snapC.getMetadataManager().getDeletedTable().iterator()) {
+        while (iterator.hasNext()) {
+          if (iterator.next().getKey().contains(keyA)) {
+            return true;
+          }
+        }
+
+        return false;
+      } catch (IOException e) {
+        Assertions.fail();
+        return false;
+      }
+    }, 1000, 10000);
+
+    // create snapshot d
+    SnapshotInfo snapshotInfoD = createOzoneSnapshot(newLeaderOM,
+        snapshotNamePrefix + RandomStringUtils.randomNumeric(5));
+
+    // delete snapshot c
+    client.getObjectStore()
+        .deleteSnapshot(volumeName, bucketName, snapshotInfoC.getName());
+
+    GenericTestUtils.waitFor(() -> {
+      Table<String, SnapshotInfo> snapshotInfoTable =
+          newLeaderOM.getMetadataManager().getSnapshotInfoTable();
+      try {
+        return null == snapshotInfoTable.get(snapshotInfoC.getTableKey());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }, 1000, 10000);
+
+    // get snapshot d
+    OmSnapshot snapD;
+    try (ReferenceCounted<IOmMetadataReader, SnapshotCache> rcD = newLeaderOM
+        .getOmSnapshotManager()
+        .checkForSnapshot(volumeName, bucketName,
+            getSnapshotPrefix(snapshotInfoD.getName()), true)) {
+      Assertions.assertNotNull(rcD);
+      snapD = (OmSnapshot) rcD.get();
+    }
+
+    // wait until key a appears in deleted table of snapshot d
+    GenericTestUtils.waitFor(() -> {
+      try (TableIterator<String, ? extends Table.KeyValue<String,
+          RepeatedOmKeyInfo>> iterator =
+               snapD.getMetadataManager().getDeletedTable().iterator()) {
+        while (iterator.hasNext()) {
+          Table.KeyValue<String, RepeatedOmKeyInfo> next = iterator.next();
+          if (next.getKey().contains(keyA)) {
+            return true;
+          }
+        }
+
+        return false;
+      } catch (IOException e) {
+        Assertions.fail();
+        return false;
+      }
+    }, 1000, 120000);
+
+    // Check whether newly created snapshot data is reclaimed
+    client.getObjectStore()
+        .deleteSnapshot(volumeName, bucketName, newSnapshot.getName());
+    GenericTestUtils.waitFor(() -> {
+      Table<String, SnapshotInfo> snapshotInfoTable =
+          newLeaderOM.getMetadataManager().getSnapshotInfoTable();
+      try {
+        return null == snapshotInfoTable.get(newSnapshot.getTableKey());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }, 1000, 10000);
+
+    // Check whether compaction logs get appended to by forcing compaction
+    newLeaderOM.getMetadataManager()
+        .getStore()
+        .compactDB();
+    int newNumberOfLogFiles = 0;
+    long newContentLength;
+    try (BufferedReader bufferedReader =
+             Files.newBufferedReader(currentCompactionLog);
+         DirectoryStream<Path> files =
+             Files.newDirectoryStream(compactionLogPath)) {
+      newContentLength = bufferedReader.lines()
+          .mapToLong(String::length)
+          .reduce(0L, Long::sum);
+      for (Path ignored : files) {
+        newNumberOfLogFiles++;
+      }
+    }
+    Assertions.assertTrue(numberOfLogFiles != newNumberOfLogFiles
+        || contentLength != newContentLength);
+
+    // Check whether compaction backup files were pruned
+    final int finalNumberOfSstFiles = numberOfSstFiles;
+    GenericTestUtils.waitFor(() -> {
+      int newNumberOfSstFiles = 0;
+      try (DirectoryStream<Path> files =
+               Files.newDirectoryStream(sstBackupDirPath)) {
+        for (Path ignored : files) {
+          newNumberOfSstFiles++;
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      return finalNumberOfSstFiles > newNumberOfSstFiles;
+    }, 1000, 10000);
+
+    // Snap diff

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to