Xushaohong commented on code in PR #4294:
URL: https://github.com/apache/ozone/pull/4294#discussion_r1174900149


##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java:
##########
@@ -305,6 +312,236 @@ public void testInstallSnapshot() throws Exception {
     Assertions.assertTrue(hardLinkCount > 0, "No hard links were found");
   }
 
+  @Test
+  @Timeout(300)
+  public void testInstallIncrementalSnapshot() throws Exception {
+    // Get the leader OM
+    String leaderOMNodeId = OmFailoverProxyUtil
+        .getFailoverProxyProvider(objectStore.getClientProxy())
+        .getCurrentProxyOMNodeId();
+
+    OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+    OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer();
+
+    // Find the inactive OM
+    String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
+    if (cluster.isOMActive(followerNodeId)) {
+      followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
+    }
+    OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+
+    // Set fault injector to pause before install
+    FaultInjector faultInjector = new SnapshotPauseInjector();
+    followerOM.getOmSnapshotProvider().setInjector(faultInjector);
+
+    // Do some transactions so that the log index increases
+    List<String> firstKeys = writeKeysToIncreaseLogIndex(leaderRatisServer,
+        80);
+
+    // Start the inactive OM. Checkpoint installation will happen 
spontaneously.
+    cluster.startInactiveOM(followerNodeId);
+
+    // Wait the follower download the snapshot,but get stuck by injector
+    GenericTestUtils.waitFor(() -> {
+      return followerOM.getOmSnapshotProvider().getNumDownloaded() == 1;
+    }, 1000, 10000);
+
+    // Do some transactions, let leader OM take a new snapshot and purge the
+    // old logs, so that follower must download the new snapshot again.
+    List<String> secondKeys = writeKeysToIncreaseLogIndex(leaderRatisServer,
+        160);
+
+    // Resume the follower thread, it would download the incremental snapshot.
+    faultInjector.resume();
+
+    // Get the latest db checkpoint from the leader OM.
+    TransactionInfo transactionInfo =
+        TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager());
+    TermIndex leaderOMTermIndex =
+        TermIndex.valueOf(transactionInfo.getTerm(),
+            transactionInfo.getTransactionIndex());
+    long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
+
+    // The recently started OM should be lagging behind the leader OM.
+    // Wait & for follower to update transactions to leader snapshot index.
+    // Timeout error if follower does not load update within 10s
+    GenericTestUtils.waitFor(() -> {
+      return followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
+          >= leaderOMSnapshotIndex - 1;
+    }, 1000, 10000);
+
+    assertEquals(2, followerOM.getOmSnapshotProvider().getNumDownloaded());
+
+    // Verify that the follower OM's DB contains the transactions which were
+    // made while it was inactive.
+    OMMetadataManager followerOMMetaMngr = followerOM.getMetadataManager();
+    assertNotNull(followerOMMetaMngr.getVolumeTable().get(
+        followerOMMetaMngr.getVolumeKey(volumeName)));
+    assertNotNull(followerOMMetaMngr.getBucketTable().get(
+        followerOMMetaMngr.getBucketKey(volumeName, bucketName)));
+
+    for (String key : firstKeys) {
+      assertNotNull(followerOMMetaMngr.getKeyTable(TEST_BUCKET_LAYOUT)
+          .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
+    }
+    for (String key : secondKeys) {
+      assertNotNull(followerOMMetaMngr.getKeyTable(TEST_BUCKET_LAYOUT)
+          .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
+    }
+
+    // Verify the metrics recording the incremental checkpoint at leader side
+    DBCheckpointMetrics dbMetrics = leaderOM.getMetrics().
+        getDBCheckpointMetrics();
+    Assertions.assertTrue(
+        dbMetrics.getLastCheckpointStreamingNumSSTExcluded() > 0);
+    assertEquals(1, dbMetrics.getNumIncrementalCheckpoints());
+
+    // Verify RPC server is running
+    GenericTestUtils.waitFor(() -> {
+      return followerOM.isOmRpcServerRunning();
+    }, 100, 5000);
+
+    // Read & Write after snapshot installed.
+    List<String> newKeys = writeKeys(1);
+    readKeys(newKeys);
+    assertNotNull(followerOMMetaMngr.getKeyTable(
+        TEST_BUCKET_LAYOUT).get(followerOMMetaMngr.getOzoneKey(
+        volumeName, bucketName, newKeys.get(0))));
+
+    // Verify follower candidate directory get cleaned
+    String[] filesInCandidate = followerOM.getOmSnapshotProvider().
+        getCandidateDir().list();
+    assertNotNull(filesInCandidate);
+    assertEquals(0, filesInCandidate.length);
+  }
+
+  @Test
+  @Timeout(300)
+  public void testInstallIncrementalSnapshotWithFailure() throws Exception {
+    // Get the leader OM
+    String leaderOMNodeId = OmFailoverProxyUtil
+        .getFailoverProxyProvider(objectStore.getClientProxy())
+        .getCurrentProxyOMNodeId();
+
+    OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+    OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer();
+
+    // Find the inactive OM
+    String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
+    if (cluster.isOMActive(followerNodeId)) {
+      followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
+    }
+    OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+
+    // Set fault injector to pause before install
+    FaultInjector faultInjector = new SnapshotPauseInjector();
+    followerOM.getOmSnapshotProvider().setInjector(faultInjector);
+
+    // Do some transactions so that the log index increases
+    List<String> firstKeys = writeKeysToIncreaseLogIndex(leaderRatisServer,
+        80);
+
+    // Start the inactive OM. Checkpoint installation will happen 
spontaneously.
+    cluster.startInactiveOM(followerNodeId);
+
+    // Wait the follower download the snapshot,but get stuck by injector
+    GenericTestUtils.waitFor(() -> {
+      return followerOM.getOmSnapshotProvider().getNumDownloaded() == 1;
+    }, 1000, 10000);
+
+    // Do some transactions, let leader OM take a new snapshot and purge the
+    // old logs, so that follower must download the new snapshot again.
+    List<String> secondKeys = writeKeysToIncreaseLogIndex(leaderRatisServer,
+        160);
+
+    // Resume the follower thread, it would download the incremental snapshot.
+    faultInjector.resume();
+
+    // Pause the follower thread again to block the second-time install
+    faultInjector.reset();
+
+    // Wait the follower download the incremental snapshot, but get stuck
+    // by injector
+    GenericTestUtils.waitFor(() -> {
+      return followerOM.getOmSnapshotProvider().getNumDownloaded() == 2;
+    }, 1000, 10000);
+
+    // Corrupt the mixed checkpoint in the candidate DB dir
+    File followerCandidateDir = followerOM.getOmSnapshotProvider().
+        getCandidateDir();
+    List<String> sstList = HAUtils.getExistingSstFiles(followerCandidateDir);
+    Assertions.assertTrue(sstList.size() > 0);
+    Collections.shuffle(sstList);
+    List<String> victimSstList = sstList.subList(0, sstList.size() / 3);
+    for (String sst: victimSstList) {
+      File victimSst = new File(followerCandidateDir, sst);
+      Assertions.assertTrue(victimSst.delete());
+    }
+
+    // Resume the follower thread, it would download the full snapshot again
+    // as the installation will fail for the corruption detected.
+    faultInjector.resume();
+
+    // Get the latest db checkpoint from the leader OM.
+    TransactionInfo transactionInfo =
+        TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager());
+    TermIndex leaderOMTermIndex =
+        TermIndex.valueOf(transactionInfo.getTerm(),
+            transactionInfo.getTransactionIndex());
+    long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
+
+    // Wait & for follower to update transactions to leader snapshot index.
+    // Timeout error if follower does not load update within 10s
+    GenericTestUtils.waitFor(() -> {
+      return followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
+          >= leaderOMSnapshotIndex - 1;
+    }, 1000, 10000);
+
+    // Verify that the follower OM's DB contains the transactions which were
+    // made while it was inactive.
+    OMMetadataManager followerOMMetaMngr = followerOM.getMetadataManager();
+    assertNotNull(followerOMMetaMngr.getVolumeTable().get(
+        followerOMMetaMngr.getVolumeKey(volumeName)));
+    assertNotNull(followerOMMetaMngr.getBucketTable().get(
+        followerOMMetaMngr.getBucketKey(volumeName, bucketName)));
+
+    // Verify that the follower OM's DB contains the transactions which were
+    // made while it was inactive.
+    for (String key : firstKeys) {
+      assertNotNull(followerOMMetaMngr.getKeyTable(TEST_BUCKET_LAYOUT)
+          .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
+    }
+    for (String key : secondKeys) {
+      assertNotNull(followerOMMetaMngr.getKeyTable(TEST_BUCKET_LAYOUT)
+          .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
+    }
+
+    // Verify the metrics
+    DBCheckpointMetrics dbMetrics = leaderOM.getMetrics().
+        getDBCheckpointMetrics();
+    assertEquals(0, dbMetrics.getLastCheckpointStreamingNumSSTExcluded());
+    assertTrue(dbMetrics.getNumIncrementalCheckpoints() >= 1);
+    assertTrue(dbMetrics.getNumCheckpoints() >= 2);

Review Comment:
   Yes, good catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to