prashantpogde commented on code in PR #7200:
URL: https://github.com/apache/ozone/pull/7200#discussion_r1767408228
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingService.java:
##########
@@ -454,6 +471,190 @@ public void testSnapshotWithFSO() throws Exception {
rcSnap1.close();
}
+ private DirectoryDeletingService
getMockedDirectoryDeletingService(AtomicBoolean dirDeletionWaitStarted,
+
AtomicBoolean dirDeletionStarted)
+ throws InterruptedException, TimeoutException {
+ OzoneManager ozoneManager = Mockito.spy(om);
+ om.getKeyManager().getDirDeletingService().shutdown();
+ GenericTestUtils.waitFor(() ->
om.getKeyManager().getDirDeletingService().getThreadCount() == 0, 1000,
+ 100000);
+ DirectoryDeletingService directoryDeletingService = Mockito.spy(new
DirectoryDeletingService(10000,
+ TimeUnit.MILLISECONDS, 100000, ozoneManager, cluster.getConf()));
+ directoryDeletingService.shutdown();
+ GenericTestUtils.waitFor(() -> directoryDeletingService.getThreadCount()
== 0, 1000,
+ 100000);
+ when(ozoneManager.getMetadataManager()).thenAnswer(i -> {
+ // Wait for SDS to reach DDS wait block before processing any deleted
directories.
+ GenericTestUtils.waitFor(dirDeletionWaitStarted::get, 1000, 100000);
+ dirDeletionStarted.set(true);
+ return i.callRealMethod();
+ });
+ return directoryDeletingService;
+ }
+
+ private KeyDeletingService getMockedKeyDeletingService(AtomicBoolean
keyDeletionWaitStarted,
+ AtomicBoolean
keyDeletionStarted)
+ throws InterruptedException, TimeoutException, IOException {
+ OzoneManager ozoneManager = Mockito.spy(om);
+ om.getKeyManager().getDeletingService().shutdown();
+ GenericTestUtils.waitFor(() ->
om.getKeyManager().getDeletingService().getThreadCount() == 0, 1000,
+ 100000);
+ KeyManager keyManager = Mockito.spy(om.getKeyManager());
+ when(ozoneManager.getKeyManager()).thenReturn(keyManager);
+ KeyDeletingService keyDeletingService = Mockito.spy(new
KeyDeletingService(ozoneManager,
+ ozoneManager.getScmClient().getBlockClient(), keyManager, 10000,
+ 100000, cluster.getConf()));
+ keyDeletingService.shutdown();
+ GenericTestUtils.waitFor(() -> keyDeletingService.getThreadCount() == 0,
1000,
+ 100000);
+ when(keyManager.getPendingDeletionKeys(anyInt())).thenAnswer(i -> {
+ // wait for SDS to reach the KDS wait block before processing any key.
+ GenericTestUtils.waitFor(keyDeletionWaitStarted::get, 1000, 100000);
+ keyDeletionStarted.set(true);
+ return i.callRealMethod();
+ });
+ return keyDeletingService;
+ }
+
+ private SnapshotDeletingService
getMockedSnapshotDeletingService(KeyDeletingService keyDeletingService,
+
DirectoryDeletingService directoryDeletingService,
+
AtomicBoolean snapshotDeletionStarted,
+
AtomicBoolean keyDeletionWaitStarted,
+
AtomicBoolean dirDeletionWaitStarted,
+
AtomicBoolean keyDeletionStarted,
+
AtomicBoolean dirDeletionStarted)
+ throws InterruptedException, TimeoutException, IOException {
+ OzoneManager ozoneManager = Mockito.spy(om);
+ om.getKeyManager().getSnapshotDeletingService().shutdown();
+ GenericTestUtils.waitFor(() ->
om.getKeyManager().getSnapshotDeletingService().getThreadCount() == 0, 1000,
+ 100000);
+ KeyManager keyManager = Mockito.spy(om.getKeyManager());
+ OmSnapshotManager omSnapshotManager =
Mockito.spy(om.getOmSnapshotManager());
+ when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager);
+ when(ozoneManager.getKeyManager()).thenReturn(keyManager);
+ when(keyManager.getDeletingService()).thenReturn(keyDeletingService);
+
when(keyManager.getDirDeletingService()).thenReturn(directoryDeletingService);
+ SnapshotDeletingService snapshotDeletingService = Mockito.spy(new
SnapshotDeletingService(10000,
+ 100000, ozoneManager));
+ snapshotDeletingService.shutdown();
+ GenericTestUtils.waitFor(() -> snapshotDeletingService.getThreadCount() ==
0, 1000,
+ 100000);
+ doAnswer(i -> {
+ // KDS wait block reached in SDS.
+ GenericTestUtils.waitFor(keyDeletingService::isRunningOnAOS, 1000,
100000);
+ keyDeletionWaitStarted.set(true);
+ return i.callRealMethod();
+ }).when(snapshotDeletingService).waitForKeyDeletingService();
+ doAnswer(i -> {
+ // DDS wait block reached in SDS.
+ GenericTestUtils.waitFor(directoryDeletingService::isRunningOnAOS, 1000,
100000);
+ dirDeletionWaitStarted.set(true);
+ return i.callRealMethod();
+ }).when(snapshotDeletingService).waitForDirDeletingService();
+ doAnswer(i -> {
+ // Assert KDS & DDS is not running when SDS starts moving entries &
assert all wait block, KDS processing
+ // AOS block & DDS AOS block have been executed.
+ Assertions.assertTrue(keyDeletionWaitStarted.get());
+ Assertions.assertTrue(dirDeletionWaitStarted.get());
+ Assertions.assertTrue(keyDeletionStarted.get());
+ Assertions.assertTrue(dirDeletionStarted.get());
+ Assertions.assertFalse(keyDeletingService.isRunningOnAOS());
+ Assertions.assertFalse(directoryDeletingService.isRunningOnAOS());
+ snapshotDeletionStarted.set(true);
+ return i.callRealMethod();
+ }).when(omSnapshotManager).getSnapshot(anyString(), anyString(),
anyString());
+ return snapshotDeletingService;
+ }
+
+ @Test
+ @Order(4)
+ public void testParallelExcecutionOfKeyDeletionAndSnapshotDeletion() throws
Exception {
+ AtomicBoolean keyDeletionWaitStarted = new AtomicBoolean(false);
+ AtomicBoolean dirDeletionWaitStarted = new AtomicBoolean(false);
+ AtomicBoolean keyDeletionStarted = new AtomicBoolean(false);
+ AtomicBoolean dirDeletionStarted = new AtomicBoolean(false);
+ AtomicBoolean snapshotDeletionStarted = new AtomicBoolean(false);
+ // mock keyDeletingService
+ KeyDeletingService keyDeletingService =
getMockedKeyDeletingService(keyDeletionWaitStarted, keyDeletionStarted);
+
+ // mock dirDeletingService
+ DirectoryDeletingService directoryDeletingService =
getMockedDirectoryDeletingService(dirDeletionWaitStarted,
+ dirDeletionStarted);
+
+ // mock snapshotDeletingService.
+ SnapshotDeletingService snapshotDeletingService =
getMockedSnapshotDeletingService(keyDeletingService,
+ directoryDeletingService, snapshotDeletionStarted,
keyDeletionWaitStarted, dirDeletionWaitStarted,
+ keyDeletionStarted, dirDeletionStarted);
+
+ String bucketName = "bucket" + new Random().nextInt();
+ BucketArgs bucketArgs = new BucketArgs.Builder()
+ .setBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED)
+ .build();
+ OzoneBucket testBucket = TestDataUtil.createBucket(
+ client, VOLUME_NAME, bucketArgs, bucketName);
+ createSnapshotFSODataForBucket(testBucket);
+ List<Table.KeyValue<String, String>> renamesKeyEntries;
+ List<Table.KeyValue<String, List<OmKeyInfo>>> deletedKeyEntries;
+ List<Table.KeyValue<String, OmKeyInfo>> deletedDirEntries;
+ try (ReferenceCounted<OmSnapshot> snapshot =
om.getOmSnapshotManager().getSnapshot(testBucket.getVolumeName(),
+ testBucket.getName(), testBucket.getName() + "snap2")) {
+ renamesKeyEntries =
snapshot.get().getKeyManager().getRenamesKeyEntries(testBucket.getVolumeName(),
+ testBucket.getName(), "", 1000);
+ deletedKeyEntries =
snapshot.get().getKeyManager().getDeletedKeyEntries(testBucket.getVolumeName(),
+ testBucket.getName(), "", 1000);
+ deletedDirEntries =
snapshot.get().getKeyManager().getDeletedDirEntries(testBucket.getVolumeName(),
+ testBucket.getName(), 1000);
+ }
+ Thread keyDeletingThread = new Thread(() -> {
+ try {
+ keyDeletingService.runPeriodicalTaskNow();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ Thread directoryDeletingThread = new Thread(() -> {
+ try {
+ directoryDeletingService.runPeriodicalTaskNow();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ ExecutorService snapshotDeletingThread = Executors.newFixedThreadPool(1);
+ Runnable snapshotDeletionRunnable = () -> {
+ try {
+ snapshotDeletingService.runPeriodicalTaskNow();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+ keyDeletingThread.start();
+ directoryDeletingThread.start();
+ Future<?> future = snapshotDeletingThread.submit(snapshotDeletionRunnable);
+ GenericTestUtils.waitFor(snapshotDeletionStarted::get, 1000, 30000);
+ future.get();
+ Mockito.reset(snapshotDeletingService);
+ SnapshotInfo snap2 = SnapshotUtils.getSnapshotInfo(om,
testBucket.getVolumeName(),
+ testBucket.getName(), testBucket.getName() + "snap2");
+ Assertions.assertEquals(snap2.getSnapshotStatus(),
SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED);
+ future = snapshotDeletingThread.submit(snapshotDeletionRunnable);
+ future.get();
+ Assertions.assertThrows(IOException.class, () ->
SnapshotUtils.getSnapshotInfo(om, testBucket.getVolumeName(),
+ testBucket.getName(), testBucket.getName() + "snap2"));
+ List<Table.KeyValue<String, String>> aosRenamesKeyEntries =
+ om.getKeyManager().getRenamesKeyEntries(testBucket.getVolumeName(),
+ testBucket.getName(), "", 1000);
+ List<Table.KeyValue<String, List<OmKeyInfo>>> aosDeletedKeyEntries =
+ om.getKeyManager().getDeletedKeyEntries(testBucket.getVolumeName(),
+ testBucket.getName(), "", 1000);
+ List<Table.KeyValue<String, OmKeyInfo>> aosDeletedDirEntries =
+ om.getKeyManager().getDeletedDirEntries(testBucket.getVolumeName(),
+ testBucket.getName(), 1000);
+ renamesKeyEntries.forEach(entry ->
Assertions.assertTrue(aosRenamesKeyEntries.contains(entry)));
+ deletedKeyEntries.forEach(entry ->
Assertions.assertTrue(aosDeletedKeyEntries.contains(entry)));
Review Comment:
Can we also assert that these keys are removed from the snapshot to be
deleted ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]