This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 349693515f HDDS-10217. Speed up TestKeyDeletingService (#6151)
349693515f is described below

commit 349693515ff0179398e7ddf02921b818d6cf1cef
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Sat Feb 3 09:23:16 2024 +0100

    HDDS-10217. Speed up TestKeyDeletingService (#6151)
---
 .../ozone/om/service/TestKeyDeletingService.java   | 1038 ++++++++++----------
 1 file changed, 503 insertions(+), 535 deletions(-)

diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
index 143d483bb1..77bf15ed76 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
@@ -21,18 +21,18 @@ package org.apache.hadoop.ozone.om.service;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Path;
+import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
@@ -53,8 +53,13 @@ import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
 import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
+import org.apache.ozone.test.OzoneTestBase;
 import org.apache.ratis.util.ExitUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.io.TempDir;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,30 +68,27 @@ import 
org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
-import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
-import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
 
-import org.apache.commons.lang3.RandomStringUtils;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
 import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.fail;
 
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
@@ -99,607 +101,533 @@ import org.junit.jupiter.api.Timeout;
  * Metadata Manager. 3. Waits for a while for the KeyDeleting Service to pick 
up
  * and call into SCM. 4. Confirms that calls have been successful.
  */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @Timeout(300)
-public class TestKeyDeletingService {
-  @TempDir
-  private Path folder;
-  private OzoneManagerProtocol writeClient;
-  private OzoneManager om;
+class TestKeyDeletingService extends OzoneTestBase {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestKeyDeletingService.class);
+  private static final AtomicInteger OBJECT_COUNTER = new AtomicInteger();
+
+  private OzoneConfiguration conf;
+  private OzoneManagerProtocol writeClient;
+  private OzoneManager om;
+  private KeyManager keyManager;
+  private OMMetadataManager metadataManager;
+  private KeyDeletingService keyDeletingService;
+  private ScmBlockLocationTestingClient scmBlockTestingClient;
 
   @BeforeAll
-  public static void setup() {
+  void setup() {
     ExitUtils.disableSystemExit();
   }
 
-  private OzoneConfiguration createConfAndInitValues() throws IOException {
-    OzoneConfiguration conf = new OzoneConfiguration();
-    File newFolder = folder.toFile();
-    if (!newFolder.exists()) {
-      assertTrue(newFolder.mkdirs());
-    }
+  private void createConfig(File testDir) {
+    conf = new OzoneConfiguration();
     System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
-    ServerUtils.setOzoneMetaDirPath(conf, newFolder.toString());
-    conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 1000,
-        TimeUnit.MILLISECONDS);
+    ServerUtils.setOzoneMetaDirPath(conf, testDir.toString());
+    conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
+        100, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL,
         100, TimeUnit.MILLISECONDS);
-    conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
-        TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL,
+        1, TimeUnit.SECONDS);
+    conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL,
+        200, TimeUnit.MILLISECONDS);
     conf.setQuietMode(false);
-
-    return conf;
   }
 
-  @AfterEach
-  public void cleanup() throws Exception {
-    om.stop();
+  private void createSubject() throws Exception {
+    OmTestManagers omTestManagers = new OmTestManagers(conf, 
scmBlockTestingClient, null);
+    keyManager = omTestManagers.getKeyManager();
+    keyDeletingService = keyManager.getDeletingService();
+    writeClient = omTestManagers.getWriteClient();
+    om = omTestManagers.getOzoneManager();
+    metadataManager = omTestManagers.getMetadataManager();
   }
 
   /**
-   * In this test, we create a bunch of keys and delete them. Then we start the
-   * KeyDeletingService and pass a SCMClient which does not fail. We make sure
-   * that all the keys that we deleted is picked up and deleted by
-   * OzoneManager.
-   *
-   * @throws IOException - on Failure.
+   * Tests happy path.
    */
+  @Nested
+  @TestInstance(TestInstance.Lifecycle.PER_CLASS)
+  class Normal {
 
-  @Test
-  public void checkIfDeleteServiceIsDeletingKeys()
-      throws IOException, TimeoutException, InterruptedException,
-      AuthenticationException {
-    OzoneConfiguration conf = createConfAndInitValues();
-    OmTestManagers omTestManagers
-        = new OmTestManagers(conf);
-    KeyManager keyManager = omTestManagers.getKeyManager();
-    writeClient = omTestManagers.getWriteClient();
-    om = omTestManagers.getOzoneManager();
+    @BeforeAll
+    void setup(@TempDir File testDir) throws Exception {
+      // failCallsFrequency = 0 means all calls succeed
+      scmBlockTestingClient = new ScmBlockLocationTestingClient(null, null, 0);
 
-    final int keyCount = 100;
-    createAndDeleteKeys(keyManager, keyCount, 1);
-    KeyDeletingService keyDeletingService =
-        (KeyDeletingService) keyManager.getDeletingService();
-    GenericTestUtils.waitFor(
-        () -> keyDeletingService.getDeletedKeyCount().get() >= keyCount,
-        1000, 10000);
-    
assertThat(keyDeletingService.getRunCount().get()).isGreaterThanOrEqualTo(1);
-    assertEquals(0, keyManager.getPendingDeletionKeys(
-        Integer.MAX_VALUE).getKeyBlocksList().size());
-  }
+      createConfig(testDir);
+      createSubject();
+    }
 
-  @Test
-  public void checkIfDeleteServiceWithFailingSCM()
-      throws IOException, TimeoutException, InterruptedException,
-      AuthenticationException {
-    OzoneConfiguration conf = createConfAndInitValues();
-    ScmBlockLocationProtocol blockClient =
-        //failCallsFrequency = 1 , means all calls fail.
-        new ScmBlockLocationTestingClient(null, null, 1);
-    OmTestManagers omTestManagers
-        = new OmTestManagers(conf, blockClient, null);
-    KeyManager keyManager = omTestManagers.getKeyManager();
-    writeClient = omTestManagers.getWriteClient();
-    om = omTestManagers.getOzoneManager();
+    @AfterEach
+    void resume() {
+      keyDeletingService.resume();
+    }
 
-    final int keyCount = 100;
-    createAndDeleteKeys(keyManager, keyCount, 1);
-    KeyDeletingService keyDeletingService =
-        (KeyDeletingService) keyManager.getDeletingService();
-    GenericTestUtils.waitFor(
-        () -> {
-          try {
-            int numPendingDeletionKeys =
-                keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
-                    .getKeyBlocksList().size();
-            if (numPendingDeletionKeys != keyCount) {
-              LOG.info("Expected {} keys to be pending deletion, but got {}",
-                  keyCount, numPendingDeletionKeys);
-              return false;
-            }
-            return true;
-          } catch (IOException e) {
-            LOG.error("Error while getting pending deletion keys.", e);
-            return false;
-          }
-        }, 100, 2000);
-    // Make sure that we have run the background thread 5 times more
-    GenericTestUtils.waitFor(
-        () -> keyDeletingService.getRunCount().get() >= 5,
-        100, 10000);
-    // Since SCM calls are failing, deletedKeyCount should be zero.
-    assertEquals(0, keyDeletingService.getDeletedKeyCount().get());
-    assertEquals(keyCount, keyManager
-        .getPendingDeletionKeys(Integer.MAX_VALUE).getKeyBlocksList().size());
-  }
+    @AfterAll
+    void cleanup() {
+      if (om.stop()) {
+        om.join();
+      }
+    }
 
-  @Test
-  public void checkDeletionForEmptyKey()
-      throws IOException, TimeoutException, InterruptedException,
-      AuthenticationException {
-    OzoneConfiguration conf = createConfAndInitValues();
-    ScmBlockLocationProtocol blockClient =
-        //failCallsFrequency = 1 , means all calls fail.
-        new ScmBlockLocationTestingClient(null, null, 1);
-    OmTestManagers omTestManagers
-        = new OmTestManagers(conf, blockClient, null);
-    KeyManager keyManager = omTestManagers.getKeyManager();
-    writeClient = omTestManagers.getWriteClient();
-    om = omTestManagers.getOzoneManager();
+    /**
+     * In this test, we create a bunch of keys and delete them. Then we start 
the
+     * KeyDeletingService and pass a SCMClient which does not fail. We make 
sure
+     * that all the keys that we deleted is picked up and deleted by
+     * OzoneManager.
+     */
+    @Test
+    void checkIfDeleteServiceIsDeletingKeys()
+        throws IOException, TimeoutException, InterruptedException {
+      final long initialDeletedCount = getDeletedKeyCount();
+      final long initialRunCount = getRunCount();
+
+      final int keyCount = 100;
+      createAndDeleteKeys(keyCount, 1);
+
+      GenericTestUtils.waitFor(
+          () -> getDeletedKeyCount() >= initialDeletedCount + keyCount,
+          100, 10000);
+      assertThat(getRunCount()).isGreaterThan(initialRunCount);
+      
assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).getKeyBlocksList())
+          .isEmpty();
+    }
 
-    final int keyCount = 100;
-    createAndDeleteKeys(keyManager, keyCount, 0);
-    KeyDeletingService keyDeletingService =
-        (KeyDeletingService) keyManager.getDeletingService();
-
-    // the pre-allocated blocks are not committed, hence they will be deleted.
-    GenericTestUtils.waitFor(
-        () -> {
-          try {
-            int numPendingDeletionKeys =
-                keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
-                    .getKeyBlocksList().size();
-            if (numPendingDeletionKeys != keyCount) {
-              LOG.info("Expected {} keys to be pending deletion, but got {}",
-                  keyCount, numPendingDeletionKeys);
-              return false;
-            }
-            return true;
-          } catch (IOException e) {
-            LOG.error("Error while getting pending deletion keys.", e);
-            return false;
-          }
-        }, 100, 2000);
-
-    // Make sure that we have run the background thread 2 times or more
-    GenericTestUtils.waitFor(
-        () -> keyDeletingService.getRunCount().get() >= 2,
-        100, 1000);
-    // the blockClient is set to fail the deletion of key blocks, hence no keys
-    // will be deleted
-    assertEquals(0, keyDeletingService.getDeletedKeyCount().get());
-  }
+    @Test
+    void checkDeletionForKeysWithMultipleVersions() throws Exception {
+      final long initialDeletedCount = getDeletedKeyCount();
+      final long initialRunCount = getRunCount();
+      final int initialDeletedBlockCount = 
scmBlockTestingClient.getNumberOfDeletedBlocks();
 
-  @Test
-  public void checkDeletionForPartiallyCommitKey()
-      throws IOException, TimeoutException, InterruptedException,
-      AuthenticationException {
-    OzoneConfiguration conf = createConfAndInitValues();
-    ScmBlockLocationProtocol blockClient =
-        //failCallsFrequency = 1 , means all calls fail.
-        new ScmBlockLocationTestingClient(null, null, 1);
-    OmTestManagers omTestManagers
-        = new OmTestManagers(conf, blockClient, null);
-    KeyManager keyManager = omTestManagers.getKeyManager();
-    writeClient = omTestManagers.getWriteClient();
-    om = omTestManagers.getOzoneManager();
+      final String volumeName = getTestName();
+      final String bucketName = uniqueObjectName("bucket");
 
-    String volumeName = String.format("volume%s",
-        RandomStringUtils.randomAlphanumeric(5));
-    String bucketName = String.format("bucket%s",
-        RandomStringUtils.randomAlphanumeric(5));
-    String keyName = String.format("key%s",
-        RandomStringUtils.randomAlphanumeric(5));
-
-    // Create Volume and Bucket
-    createVolumeAndBucket(keyManager, volumeName, bucketName, false);
-
-    OmKeyArgs keyArg = createAndCommitKey(keyManager, volumeName, bucketName,
-        keyName, 3, 1);
-
-    // Only the uncommitted block should be pending to be deleted.
-    GenericTestUtils.waitFor(
-        () -> {
-          try {
-            return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
-                .getKeyBlocksList()
-                .stream()
-                .map(BlockGroup::getBlockIDList)
-                .flatMap(Collection::stream)
-                .collect(Collectors.toList()).size() == 1;
-          } catch (IOException e) {
-            e.printStackTrace();
-          }
-          return false;
-        },
-        500, 3000);
-
-    // Delete the key
-    writeClient.deleteKey(keyArg);
+      // Create Volume and Bucket with versioning enabled
+      createVolumeAndBucket(volumeName, bucketName, true);
 
-    KeyDeletingService keyDeletingService =
-        (KeyDeletingService) keyManager.getDeletingService();
-
-    // All blocks should be pending to be deleted.
-    GenericTestUtils.waitFor(
-        () -> {
-          try {
-            return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
-                .getKeyBlocksList()
-                .stream()
-                .map(BlockGroup::getBlockIDList)
-                .flatMap(Collection::stream)
-                .collect(Collectors.toList()).size() == 3;
-          } catch (IOException e) {
-            e.printStackTrace();
-          }
-          return false;
-        },
-        500, 3000);
-
-    // the blockClient is set to fail the deletion of key blocks, hence no keys
-    // will be deleted
-    assertEquals(0, keyDeletingService.getDeletedKeyCount().get());
-  }
+      // Create 2 versions of the same key
+      final String keyName = uniqueObjectName("key");
+      OmKeyArgs keyArgs = createAndCommitKey(volumeName, bucketName, keyName, 
1);
+      createAndCommitKey(volumeName, bucketName, keyName, 2);
 
-  @Test
-  public void checkDeletionForKeysWithMultipleVersions()
-      throws IOException, TimeoutException, InterruptedException,
-      AuthenticationException {
-    OzoneConfiguration conf = createConfAndInitValues();
-    OmTestManagers omTestManagers = new OmTestManagers(conf);
-    KeyManager keyManager = omTestManagers.getKeyManager();
-    writeClient = omTestManagers.getWriteClient();
-    om = omTestManagers.getOzoneManager();
+      // Delete the key
+      writeClient.deleteKey(keyArgs);
+
+      GenericTestUtils.waitFor(
+          () -> getDeletedKeyCount() >= initialDeletedCount + 1,
+          1000, 10000);
+      assertThat(getRunCount())
+          .isGreaterThan(initialRunCount);
+      
assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).getKeyBlocksList())
+          .isEmpty();
+
+      // The 1st version of the key has 1 block and the 2nd version has 2
+      // blocks. Hence, the ScmBlockClient should have received at least 3
+      // blocks for deletion from the KeyDeletionService
+      assertThat(scmBlockTestingClient.getNumberOfDeletedBlocks())
+          .isGreaterThanOrEqualTo(initialDeletedBlockCount + 3);
+    }
 
-    String volumeName = String.format("volume%s",
-        RandomStringUtils.randomAlphanumeric(5));
-    String bucketName = String.format("bucket%s",
-        RandomStringUtils.randomAlphanumeric(5));
-
-    // Create Volume and Bucket with versioning enabled
-    createVolumeAndBucket(keyManager, volumeName, bucketName, true);
-
-    // Create 2 versions of the same key
-    String keyName = String.format("key%s",
-        RandomStringUtils.randomAlphanumeric(5));
-    OmKeyArgs keyArgs = createAndCommitKey(keyManager, volumeName, bucketName,
-        keyName, 1);
-    createAndCommitKey(keyManager, volumeName, bucketName, keyName, 2);
-
-    // Delete the key
-    writeClient.deleteKey(keyArgs);
-
-    KeyDeletingService keyDeletingService =
-        (KeyDeletingService) keyManager.getDeletingService();
-    GenericTestUtils.waitFor(
-        () -> keyDeletingService.getDeletedKeyCount().get() >= 1,
-        1000, 10000);
-    
assertThat(keyDeletingService.getRunCount().get()).isGreaterThanOrEqualTo(1);
-    assertEquals(0, keyManager.getPendingDeletionKeys(
-        Integer.MAX_VALUE).getKeyBlocksList().size());
-
-    // The 1st version of the key has 1 block and the 2nd version has 2
-    // blocks. Hence, the ScmBlockClient should have received atleast 3
-    // blocks for deletion from the KeyDeletionService
-    ScmBlockLocationTestingClient scmBlockTestingClient =
-        (ScmBlockLocationTestingClient) omTestManagers.getScmBlockClient();
-    
assertThat(scmBlockTestingClient.getNumberOfDeletedBlocks()).isGreaterThanOrEqualTo(3);
-  }
+    @Test
+    void checkDeletedTableCleanUpForSnapshot() throws Exception {
+      final String volumeName = getTestName();
+      final String bucketName1 = uniqueObjectName("bucket");
+      final String bucketName2 = uniqueObjectName("bucket");
+      final String keyName = uniqueObjectName("key");
 
-  private void createAndDeleteKeys(KeyManager keyManager, int keyCount,
-      int numBlocks) throws IOException {
-    for (int x = 0; x < keyCount; x++) {
-      String volumeName = String.format("volume%s",
-          RandomStringUtils.randomAlphanumeric(5));
-      String bucketName = String.format("bucket%s",
-          RandomStringUtils.randomAlphanumeric(5));
-      String keyName = String.format("key%s",
-          RandomStringUtils.randomAlphanumeric(5));
+      final long initialDeletedCount = getDeletedKeyCount();
+      final long initialRunCount = getRunCount();
 
-      // Create Volume and Bucket
-      createVolumeAndBucket(keyManager, volumeName, bucketName, false);
+      // Create Volume and Buckets
+      createVolumeAndBucket(volumeName, bucketName1, false);
+      createVolumeAndBucket(volumeName, bucketName2, false);
 
-      // Create the key
-      OmKeyArgs keyArg = createAndCommitKey(keyManager, volumeName, bucketName,
-          keyName, numBlocks);
+      // Create the keys
+      OmKeyArgs key1 = createAndCommitKey(volumeName, bucketName1, keyName, 3);
+      OmKeyArgs key2 = createAndCommitKey(volumeName, bucketName2, keyName, 3);
+
+      // Create snapshot
+      String snapName = uniqueObjectName("snap");
+      writeClient.createSnapshot(volumeName, bucketName1, snapName);
 
       // Delete the key
-      writeClient.deleteKey(keyArg);
+      writeClient.deleteKey(key1);
+      writeClient.deleteKey(key2);
+
+      // Run KeyDeletingService
+      GenericTestUtils.waitFor(
+          () -> getDeletedKeyCount() >= initialDeletedCount + 1,
+          1000, 10000);
+      assertThat(getRunCount())
+          .isGreaterThan(initialRunCount);
+      
assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).getKeyBlocksList())
+          .isEmpty();
+
+      // deletedTable should have deleted key of the snapshot bucket
+      assertFalse(metadataManager.getDeletedTable().isEmpty());
+      String ozoneKey1 =
+          metadataManager.getOzoneKey(volumeName, bucketName1, keyName);
+      String ozoneKey2 =
+          metadataManager.getOzoneKey(volumeName, bucketName2, keyName);
+
+      // key1 belongs to snapshot, so it should not be deleted when
+      // KeyDeletingService runs. But key2 can be reclaimed as it doesn't
+      // belong to any snapshot scope.
+      List<? extends Table.KeyValue<String, RepeatedOmKeyInfo>> rangeKVs
+          = metadataManager.getDeletedTable().getRangeKVs(
+          null, 100, ozoneKey1);
+      assertThat(rangeKVs.size()).isGreaterThan(0);
+      rangeKVs
+          = metadataManager.getDeletedTable().getRangeKVs(
+          null, 100, ozoneKey2);
+      assertEquals(0, rangeKVs.size());
     }
-  }
 
-  @Test
-  public void checkDeletedTableCleanUpForSnapshot()
-      throws Exception {
-    OzoneConfiguration conf = createConfAndInitValues();
-    OmTestManagers omTestManagers
-        = new OmTestManagers(conf);
-    KeyManager keyManager = omTestManagers.getKeyManager();
-    writeClient = omTestManagers.getWriteClient();
-    om = omTestManagers.getOzoneManager();
-    OMMetadataManager metadataManager = omTestManagers.getMetadataManager();
-
-    String volumeName = String.format("volume%s",
-        RandomStringUtils.randomAlphanumeric(5));
-    String bucketName1 = String.format("bucket%s",
-        RandomStringUtils.randomAlphanumeric(5));
-    String bucketName2 = String.format("bucket%s",
-        RandomStringUtils.randomAlphanumeric(5));
-    String keyName = String.format("key%s",
-        RandomStringUtils.randomAlphanumeric(5));
-
-    // Create Volume and Buckets
-    createVolumeAndBucket(keyManager, volumeName, bucketName1, false);
-    createVolumeAndBucket(keyManager, volumeName, bucketName2, false);
-
-    // Create the keys
-    OmKeyArgs key1 = createAndCommitKey(keyManager, volumeName, bucketName1,
-        keyName, 3);
-    OmKeyArgs key2 = createAndCommitKey(keyManager, volumeName, bucketName2,
-        keyName, 3);
-
-    // Create snapshot
-    String snapName = "snap1";
-    writeClient.createSnapshot(volumeName, bucketName1, snapName);
-
-    // Delete the key
-    writeClient.deleteKey(key1);
-    writeClient.deleteKey(key2);
-
-    // Run KeyDeletingService
-    KeyDeletingService keyDeletingService =
-        (KeyDeletingService) keyManager.getDeletingService();
-    GenericTestUtils.waitFor(
-        () -> keyDeletingService.getDeletedKeyCount().get() >= 1,
-        1000, 10000);
-    
assertThat(keyDeletingService.getRunCount().get()).isGreaterThanOrEqualTo(1);
-    assertEquals(0, keyManager
-        .getPendingDeletionKeys(Integer.MAX_VALUE).getKeyBlocksList().size());
-
-    // deletedTable should have deleted key of the snapshot bucket
-    assertFalse(metadataManager.getDeletedTable().isEmpty());
-    String ozoneKey1 =
-        metadataManager.getOzoneKey(volumeName, bucketName1, keyName);
-    String ozoneKey2 =
-        metadataManager.getOzoneKey(volumeName, bucketName2, keyName);
-
-    // key1 belongs to snapshot, so it should not be deleted when
-    // KeyDeletingService runs. But key2 can be reclaimed as it doesn't
-    // belong to any snapshot scope.
-    List<? extends Table.KeyValue<String, RepeatedOmKeyInfo>> rangeKVs
-        = metadataManager.getDeletedTable().getRangeKVs(
-        null, 100, ozoneKey1);
-    assertThat(rangeKVs.size()).isGreaterThan(0);
-    rangeKVs
-        = metadataManager.getDeletedTable().getRangeKVs(
-        null, 100, ozoneKey2);
-    assertEquals(0, rangeKVs.size());
-  }
+    /*
+     * Create Snap1
+     * Create 10 keys
+     * Create Snap2
+     * Delete 10 keys
+     * Create 5 keys
+     * Delete 5 keys -> but stop KeyDeletingService so
+       that keys won't be reclaimed.
+     * Create snap3
+     * Now wait for snap3 to be deepCleaned -> Deleted 5
+       keys should be deep cleaned.
+     * Now delete snap2 -> Wait for snap3 to be deep cleaned so deletedTable
+       of Snap3 should be empty.
+     */
+    @Test
+    void testSnapshotDeepClean() throws Exception {
+      Table<String, SnapshotInfo> snapshotInfoTable =
+          om.getMetadataManager().getSnapshotInfoTable();
+      Table<String, RepeatedOmKeyInfo> deletedTable =
+          om.getMetadataManager().getDeletedTable();
+      Table<String, OmKeyInfo> keyTable =
+          om.getMetadataManager().getKeyTable(BucketLayout.DEFAULT);
+
+      // Suspend KeyDeletingService
+      keyDeletingService.suspend();
+
+      final long initialSnapshotCount = 
metadataManager.countRowsInTable(snapshotInfoTable);
+      final long initialKeyCount = metadataManager.countRowsInTable(keyTable);
+      final long initialDeletedCount = 
metadataManager.countRowsInTable(deletedTable);
+
+      final String volumeName = getTestName();
+      final String bucketName = uniqueObjectName("bucket");
+
+      // Create Volume and Buckets
+      createVolumeAndBucket(volumeName, bucketName, false);
+
+      writeClient.createSnapshot(volumeName, bucketName, 
uniqueObjectName("snap"));
+      assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 1, 
metadataManager);
+
+      List<OmKeyArgs> createdKeys = new ArrayList<>();
+      for (int i = 1; i <= 10; i++) {
+        OmKeyArgs args = createAndCommitKey(volumeName, bucketName,
+            uniqueObjectName("key"), 3);
+        createdKeys.add(args);
+      }
+      assertTableRowCount(keyTable, initialKeyCount + 10, metadataManager);
 
-  /*
-   * Create Snap1
-   * Create 10 keys
-   * Create Snap2
-   * Delete 10 keys
-   * Create 5 keys
-   * Delete 5 keys -> but stop KeyDeletingService so
-     that keys won't be reclaimed.
-   * Create snap3
-t
-   * Now wait for snap3 to be deepCleaned -> Deleted 5
-     keys should be deep cleaned.
-   * Now delete snap2 -> Wait for snap3 to be deep cleaned so deletedTable
-     of Snap3 should be empty.
-   */
-  @Test
-  public void testSnapshotDeepClean() throws Exception {
-    OzoneConfiguration conf = createConfAndInitValues();
-    OmTestManagers omTestManagers
-        = new OmTestManagers(conf);
-    KeyManager keyManager = omTestManagers.getKeyManager();
-    writeClient = omTestManagers.getWriteClient();
-    om = omTestManagers.getOzoneManager();
-    OMMetadataManager metadataManager = omTestManagers.getMetadataManager();
-    Table<String, SnapshotInfo> snapshotInfoTable =
-        om.getMetadataManager().getSnapshotInfoTable();
-    Table<String, RepeatedOmKeyInfo> deletedTable =
-        om.getMetadataManager().getDeletedTable();
-    Table<String, OmKeyInfo> keyTable =
-        om.getMetadataManager().getKeyTable(BucketLayout.DEFAULT);
-
-    KeyDeletingService keyDeletingService = keyManager.getDeletingService();
-    // Suspend KeyDeletingService
-    keyDeletingService.suspend();
-
-    String volumeName = String.format("volume%s",
-        RandomStringUtils.randomAlphanumeric(5));
-    String bucketName = String.format("bucket%s",
-        RandomStringUtils.randomAlphanumeric(5));
-    String keyName = String.format("key%s",
-        RandomStringUtils.randomAlphanumeric(5));
-
-    // Create Volume and Buckets
-    createVolumeAndBucket(keyManager, volumeName, bucketName, false);
-
-    writeClient.createSnapshot(volumeName, bucketName, "snap1");
-    assertTableRowCount(snapshotInfoTable, 1, metadataManager);
-
-    List<OmKeyArgs> createdKeys = new ArrayList<>();
-    for (int i = 1; i <= 10; i++) {
-      OmKeyArgs args = createAndCommitKey(keyManager, volumeName, bucketName,
-          keyName + i, 3);
-      createdKeys.add(args);
-    }
-    assertTableRowCount(keyTable, 10, metadataManager);
+      String snap2 = uniqueObjectName("snap");
+      writeClient.createSnapshot(volumeName, bucketName, snap2);
+      assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 2, 
metadataManager);
+
+      // Create 5 Keys
+      for (int i = 11; i <= 15; i++) {
+        OmKeyArgs args = createAndCommitKey(volumeName, bucketName,
+            uniqueObjectName("key"), 3);
+        createdKeys.add(args);
+      }
 
-    writeClient.createSnapshot(volumeName, bucketName, "snap2");
-    assertTableRowCount(snapshotInfoTable, 2, metadataManager);
+      // Delete all 15 keys.
+      for (int i = 0; i < 15; i++) {
+        writeClient.deleteKey(createdKeys.get(i));
+      }
 
-    // Create 5 Keys
-    for (int i = 11; i <= 15; i++) {
-      OmKeyArgs args = createAndCommitKey(keyManager, volumeName, bucketName,
-          keyName + i, 3);
-      createdKeys.add(args);
-    }
+      assertTableRowCount(deletedTable, initialDeletedCount + 15, 
metadataManager);
 
-    // Delete all 15 keys.
-    for (int i = 0; i < 15; i++) {
-      writeClient.deleteKey(createdKeys.get(i));
-    }
+      // Create Snap3, traps all the deleted keys.
+      String snap3 = uniqueObjectName("snap");
+      writeClient.createSnapshot(volumeName, bucketName, snap3);
+      assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 3, 
metadataManager);
+      checkSnapDeepCleanStatus(snapshotInfoTable, volumeName, false);
 
-    assertTableRowCount(deletedTable, 15, metadataManager);
+      keyDeletingService.resume();
 
-    // Create Snap3, traps all the deleted keys.
-    writeClient.createSnapshot(volumeName, bucketName, "snap3");
-    assertTableRowCount(snapshotInfoTable, 3, metadataManager);
-    checkSnapDeepCleanStatus(snapshotInfoTable, false);
+      try (ReferenceCounted<IOmMetadataReader, SnapshotCache> rcOmSnapshot =
+               om.getOmSnapshotManager().checkForSnapshot(
+                   volumeName, bucketName, getSnapshotPrefix(snap3), true)) {
+        OmSnapshot snapshot3 = (OmSnapshot) rcOmSnapshot.get();
 
-    keyDeletingService.resume();
+        Table<String, RepeatedOmKeyInfo> snap3deletedTable =
+            snapshot3.getMetadataManager().getDeletedTable();
 
-    try (ReferenceCounted<IOmMetadataReader, SnapshotCache> rcOmSnapshot =
-        om.getOmSnapshotManager().checkForSnapshot(
-            volumeName, bucketName, getSnapshotPrefix("snap3"), true)) {
-      OmSnapshot snap3 = (OmSnapshot) rcOmSnapshot.get();
+        // 5 keys can be deep cleaned as it was stuck previously
+        assertTableRowCount(snap3deletedTable, initialDeletedCount + 10, 
metadataManager);
 
-      Table<String, RepeatedOmKeyInfo> snap3deletedTable =
-          snap3.getMetadataManager().getDeletedTable();
+        writeClient.deleteSnapshot(volumeName, bucketName, snap2);
+        assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 2, 
metadataManager);
 
-      // 5 keys can be deep cleaned as it was stuck previously
-      assertTableRowCount(snap3deletedTable, 10, metadataManager);
+        assertTableRowCount(snap3deletedTable, initialDeletedCount, 
metadataManager);
+        assertTableRowCount(deletedTable, initialDeletedCount, 
metadataManager);
+        checkSnapDeepCleanStatus(snapshotInfoTable, volumeName, true);
+      }
+    }
 
-      writeClient.deleteSnapshot(volumeName, bucketName, "snap2");
-      assertTableRowCount(snapshotInfoTable, 2, metadataManager);
+    @Test
+    void testSnapshotExclusiveSize() throws Exception {
+      Table<String, SnapshotInfo> snapshotInfoTable =
+          om.getMetadataManager().getSnapshotInfoTable();
+      Table<String, RepeatedOmKeyInfo> deletedTable =
+          om.getMetadataManager().getDeletedTable();
+      Table<String, String> renamedTable =
+          om.getMetadataManager().getSnapshotRenamedTable();
+      Table<String, OmKeyInfo> keyTable =
+          om.getMetadataManager().getKeyTable(BucketLayout.DEFAULT);
+
+      // Supspend KDS
+      keyDeletingService.suspend();
+
+      final long initialSnapshotCount = 
metadataManager.countRowsInTable(snapshotInfoTable);
+      final long initialKeyCount = metadataManager.countRowsInTable(keyTable);
+      final long initialDeletedCount = 
metadataManager.countRowsInTable(deletedTable);
+      final long initialRenamedCount = 
metadataManager.countRowsInTable(renamedTable);
+
+      final String volumeName = getTestName();
+      final String bucketName = uniqueObjectName("bucket");
+      final String keyName = uniqueObjectName("key");
+
+      // Create Volume and Buckets
+      createVolumeAndBucket(volumeName, bucketName, false);
+
+      // Create 3 keys
+      for (int i = 1; i <= 3; i++) {
+        createAndCommitKey(volumeName, bucketName, keyName + i, 3);
+      }
+      assertTableRowCount(keyTable, initialKeyCount + 3, metadataManager);
+
+      // Create Snapshot1
+      String snap1 = uniqueObjectName("snap");
+      writeClient.createSnapshot(volumeName, bucketName, snap1);
+      assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 1, 
metadataManager);
+      assertTableRowCount(deletedTable, initialDeletedCount, metadataManager);
+
+      // Create 2 keys
+      for (int i = 4; i <= 5; i++) {
+        createAndCommitKey(volumeName, bucketName, keyName + i, 3);
+      }
+      // Delete a key, rename 2 keys. We will be using this to test
+      // how we handle renamed key for exclusive size calculation.
+      renameKey(volumeName, bucketName, keyName + 1, "renamedKey1");
+      renameKey(volumeName, bucketName, keyName + 2, "renamedKey2");
+      deleteKey(volumeName, bucketName, keyName + 3);
+      assertTableRowCount(deletedTable, initialDeletedCount + 1, 
metadataManager);
+      assertTableRowCount(renamedTable, initialRenamedCount + 2, 
metadataManager);
+
+      // Create Snapshot2
+      String snap2 = uniqueObjectName("snap");
+      writeClient.createSnapshot(volumeName, bucketName, snap2);
+      assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 2, 
metadataManager);
+      assertTableRowCount(deletedTable, initialDeletedCount, metadataManager);
+
+      // Create 2 keys
+      for (int i = 6; i <= 7; i++) {
+        createAndCommitKey(volumeName, bucketName, keyName + i, 3);
+      }
+
+      deleteKey(volumeName, bucketName, "renamedKey1");
+      deleteKey(volumeName, bucketName, keyName + 4);
+      // Do a second rename of already renamedKey2
+      renameKey(volumeName, bucketName, "renamedKey2", "renamedKey22");
+      assertTableRowCount(deletedTable, initialDeletedCount + 2, 
metadataManager);
+      assertTableRowCount(renamedTable, initialRenamedCount + 1, 
metadataManager);
+
+      // Create Snapshot3
+      String snap3 = uniqueObjectName("snap");
+      writeClient.createSnapshot(volumeName, bucketName, snap3);
+      // Delete 4 keys
+      deleteKey(volumeName, bucketName, "renamedKey22");
+      for (int i = 5; i <= 7; i++) {
+        deleteKey(volumeName, bucketName, keyName + i);
+      }
 
-      assertTableRowCount(snap3deletedTable, 0, metadataManager);
-      assertTableRowCount(deletedTable, 0, metadataManager);
-      checkSnapDeepCleanStatus(snapshotInfoTable, true);
+      // Create Snapshot4
+      String snap4 = uniqueObjectName("snap");
+      writeClient.createSnapshot(volumeName, bucketName, snap4);
+      createAndCommitKey(volumeName, bucketName, uniqueObjectName("key"), 3);
+
+      long prevKdsRunCount = getRunCount();
+      keyDeletingService.resume();
+
+      Map<String, Long> expectedSize = new ImmutableMap.Builder<String, Long>()
+          .put(snap1, 1000L)
+          .put(snap2, 1000L)
+          .put(snap3, 2000L)
+          .put(snap4, 0L)
+          .build();
+
+      // Let KeyDeletingService to run for some iterations
+      GenericTestUtils.waitFor(
+          () -> (getRunCount() > prevKdsRunCount + 5),
+          100, 10000);
+
+      // Check if the exclusive size is set.
+      try (TableIterator<String, ? extends Table.KeyValue<String, 
SnapshotInfo>>
+               iterator = snapshotInfoTable.iterator()) {
+        while (iterator.hasNext()) {
+          Table.KeyValue<String, SnapshotInfo> snapshotEntry = iterator.next();
+          String snapshotName = snapshotEntry.getValue().getName();
+          Long expected = expectedSize.getOrDefault(snapshotName, 0L);
+          assertNotNull(expected);
+          assertEquals(expected, snapshotEntry.getValue().getExclusiveSize());
+          // Since for the test we are using RATIS/THREE
+          assertEquals(expected * 3, 
snapshotEntry.getValue().getExclusiveReplicatedSize());
+        }
+      }
     }
   }
 
-  @Test
-  public void testSnapshotExclusiveSize() throws Exception {
-    OzoneConfiguration conf = createConfAndInitValues();
-    OmTestManagers omTestManagers
-        = new OmTestManagers(conf);
-    KeyManager keyManager = omTestManagers.getKeyManager();
-    writeClient = omTestManagers.getWriteClient();
-    om = omTestManagers.getOzoneManager();
-    OMMetadataManager metadataManager = omTestManagers.getMetadataManager();
-    Table<String, SnapshotInfo> snapshotInfoTable =
-        om.getMetadataManager().getSnapshotInfoTable();
-    Table<String, RepeatedOmKeyInfo> deletedTable =
-        om.getMetadataManager().getDeletedTable();
-    Table<String, String> renamedTable =
-        om.getMetadataManager().getSnapshotRenamedTable();
-    Table<String, OmKeyInfo> keyTable =
-        om.getMetadataManager().getKeyTable(BucketLayout.DEFAULT);
-
-    KeyDeletingService keyDeletingService = keyManager.getDeletingService();
-    // Supspend KDS
-    keyDeletingService.suspend();
-
-    String volumeName = "volume1";
-    String bucketName = "bucket1";
-    String keyName = "key";
-
-    // Create Volume and Buckets
-    createVolumeAndBucket(keyManager, volumeName, bucketName, false);
-
-    // Create 3 keys
-    for (int i = 1; i <= 3; i++) {
-      createAndCommitKey(keyManager, volumeName, bucketName, keyName + i, 3);
+  /**
+   * Tests failure scenarios.
+   */
+  @Nested
+  @TestInstance(TestInstance.Lifecycle.PER_CLASS)
+  class Failing {
+
+    @BeforeAll
+    void setup(@TempDir File testDir) throws Exception {
+      // failCallsFrequency = 1 means all calls fail
+      scmBlockTestingClient = new ScmBlockLocationTestingClient(null, null, 1);
+      createConfig(testDir);
+      createSubject();
     }
-    assertTableRowCount(keyTable, 3, metadataManager);
 
-    // Create Snapshot1
-    writeClient.createSnapshot(volumeName, bucketName, "snap1");
-    assertTableRowCount(snapshotInfoTable, 1, metadataManager);
-    assertTableRowCount(deletedTable, 0, metadataManager);
+    @AfterEach
+    void resume() {
+      keyDeletingService.resume();
+    }
 
-    // Create 2 keys
-    for (int i = 4; i <= 5; i++) {
-      createAndCommitKey(keyManager, volumeName, bucketName, keyName + i, 3);
+    @AfterAll
+    void cleanup() {
+      if (om.stop()) {
+        om.join();
+      }
     }
-    // Delete a key, rename 2 keys. We will be using this to test
-    // how we handle renamed key for exclusive size calculation.
-    renameKey(volumeName, bucketName, keyName + 1, "renamedKey1");
-    renameKey(volumeName, bucketName, keyName + 2, "renamedKey2");
-    deleteKey(volumeName, bucketName, keyName + 3);
-    assertTableRowCount(deletedTable, 1, metadataManager);
-    assertTableRowCount(renamedTable, 2, metadataManager);
-
-    // Create Snapshot2
-    writeClient.createSnapshot(volumeName, bucketName, "snap2");
-    assertTableRowCount(snapshotInfoTable, 2, metadataManager);
-    assertTableRowCount(deletedTable, 0, metadataManager);
-
-    // Create 2 keys
-    for (int i = 6; i <= 7; i++) {
-      createAndCommitKey(keyManager, volumeName, bucketName, keyName + i, 3);
+
+    @Test
+    void checkIfDeleteServiceWithFailingSCM() throws Exception {
+      final int initialCount = countKeysPendingDeletion();
+      final long initialRunCount = getRunCount();
+      final int keyCount = 100;
+
+      createAndDeleteKeys(keyCount, 1);
+
+      GenericTestUtils.waitFor(
+          () -> countKeysPendingDeletion() == initialCount + keyCount,
+          100, 2000);
+      // Make sure that we have run the background thread 5 times more
+      GenericTestUtils.waitFor(
+          () -> getRunCount() >= initialRunCount + 5,
+          100, 10000);
+      // Since SCM calls are failing, deletedKeyCount should be zero.
+      assertEquals(0, getDeletedKeyCount());
+      assertEquals(initialCount + keyCount, countKeysPendingDeletion());
     }
 
-    deleteKey(volumeName, bucketName, "renamedKey1");
-    deleteKey(volumeName, bucketName, "key4");
-    // Do a second rename of already renamedKey2
-    renameKey(volumeName, bucketName, "renamedKey2", "renamedKey22");
-    assertTableRowCount(deletedTable, 2, metadataManager);
-    assertTableRowCount(renamedTable, 1, metadataManager);
-
-    // Create Snapshot3
-    writeClient.createSnapshot(volumeName, bucketName, "snap3");
-    // Delete 4 keys
-    deleteKey(volumeName, bucketName, "renamedKey22");
-    for (int i = 5; i <= 7; i++) {
-      deleteKey(volumeName, bucketName, keyName + i);
+    @Test
+    void checkDeletionForEmptyKey() throws Exception {
+      final int initialCount = countKeysPendingDeletion();
+      final long initialRunCount = getRunCount();
+      final int keyCount = 100;
+
+      createAndDeleteKeys(keyCount, 0);
+
+      // the pre-allocated blocks are not committed, hence they will be 
deleted.
+      GenericTestUtils.waitFor(
+          () -> countKeysPendingDeletion() == initialCount + keyCount,
+          100, 2000);
+      // Make sure that we have run the background thread 2 times or more
+      GenericTestUtils.waitFor(
+          () -> getRunCount() >= initialRunCount + 2,
+          100, 1000);
+      // the blockClient is set to fail the deletion of key blocks, hence no 
keys
+      // will be deleted
+      assertEquals(0, getDeletedKeyCount());
     }
 
-    // Create Snapshot4
-    writeClient.createSnapshot(volumeName, bucketName, "snap4");
-    createAndCommitKey(keyManager, volumeName, bucketName, "key8", 3);
-    keyDeletingService.resume();
+    @Test
+    void checkDeletionForPartiallyCommitKey() throws Exception {
+      final String volumeName = getTestName();
+      final String bucketName = uniqueObjectName("bucket");
+      final String keyName = uniqueObjectName("key");
+      final long initialCount = countBlocksPendingDeletion();
+      createVolumeAndBucket(volumeName, bucketName, false);
 
-    Map<String, Long> expectedSize = new HashMap<String, Long>() {{
-        put("snap1", 1000L);
-        put("snap2", 1000L);
-        put("snap3", 2000L);
-        put("snap4", 0L);
-      }};
+      OmKeyArgs keyArg = createAndCommitKey(volumeName, bucketName, keyName, 
3, 1);
 
-    long prevKdsRunCount = keyDeletingService.getRunCount().get();
+      // Only the uncommitted block should be pending to be deleted.
+      GenericTestUtils.waitFor(
+          () -> countBlocksPendingDeletion() == initialCount + 1,
+          500, 3000);
 
-    // Let KeyDeletingService to run for some iterations
-    GenericTestUtils.waitFor(
-        () -> (keyDeletingService.getRunCount().get() > prevKdsRunCount + 5),
-        100, 10000);
+      writeClient.deleteKey(keyArg);
 
-    // Check if the exclusive size is set.
-    try (TableIterator<String, ? extends Table.KeyValue<String, SnapshotInfo>>
-             iterator = snapshotInfoTable.iterator()) {
-      while (iterator.hasNext()) {
-        Table.KeyValue<String, SnapshotInfo> snapshotEntry = iterator.next();
-        String snapshotName = snapshotEntry.getValue().getName();
-        assertEquals(expectedSize.get(snapshotName),
-            snapshotEntry.getValue().
-            getExclusiveSize());
-        // Since for the test we are using RATIS/THREE
-        assertEquals(expectedSize.get(snapshotName) * 3,
-            snapshotEntry.getValue().getExclusiveReplicatedSize());
-      }
+      // All blocks should be pending to be deleted.
+      GenericTestUtils.waitFor(
+          () -> countBlocksPendingDeletion() == initialCount + 3,
+          500, 3000);
+
+      // the blockClient is set to fail the deletion of key blocks, hence no 
keys
+      // will be deleted
+      assertEquals(0, getDeletedKeyCount());
     }
   }
 
-  private void checkSnapDeepCleanStatus(Table<String, SnapshotInfo>
-      snapshotInfoTable, boolean deepClean) throws IOException {
+  private void createAndDeleteKeys(int keyCount, int numBlocks) throws 
IOException {
+    for (int x = 0; x < keyCount; x++) {
+      final String volumeName = getTestName();
+      final String bucketName = uniqueObjectName("bucket");
+      final String keyName = uniqueObjectName("key");
 
-    try (TableIterator<String, ? extends Table.KeyValue<String, SnapshotInfo>>
-             iterator = snapshotInfoTable.iterator()) {
+      // Create Volume and Bucket
+      createVolumeAndBucket(volumeName, bucketName, false);
+
+      // Create the key
+      OmKeyArgs keyArg = createAndCommitKey(volumeName, bucketName,
+          keyName, numBlocks);
+
+      // Delete the key
+      writeClient.deleteKey(keyArg);
+    }
+  }
+
+  private static void checkSnapDeepCleanStatus(Table<String, SnapshotInfo> 
table, String volumeName, boolean deepClean)
+      throws IOException {
+    try (TableIterator<String, ? extends Table.KeyValue<String, SnapshotInfo>> 
iterator = table.iterator()) {
       while (iterator.hasNext()) {
         SnapshotInfo snapInfo = iterator.next().getValue();
-        assertEquals(snapInfo.getDeepClean(), deepClean);
+        if (volumeName.equals(snapInfo.getVolumeName())) {
+          assertThat(snapInfo.getDeepClean())
+              .as(snapInfo.toAuditMap().toString())
+              .isEqualTo(deepClean);
+        }
       }
     }
   }
 
-  private void assertTableRowCount(Table<String, ?> table,
-        int count, OMMetadataManager metadataManager)
+  private static void assertTableRowCount(Table<String, ?> table,
+        long count, OMMetadataManager metadataManager)
       throws TimeoutException, InterruptedException {
     GenericTestUtils.waitFor(() -> assertTableRowCount(count, table,
             metadataManager), 1000, 120000); // 2 minutes
   }
 
-  private boolean assertTableRowCount(int expectedCount,
+  private static boolean assertTableRowCount(long expectedCount,
                                       Table<String, ?> table,
                                       OMMetadataManager metadataManager) {
     long count = 0L;
@@ -713,7 +641,7 @@ t
     return count == expectedCount;
   }
 
-  private void createVolumeAndBucket(KeyManager keyManager, String volumeName,
+  private void createVolumeAndBucket(String volumeName,
       String bucketName, boolean isVersioningEnabled) throws IOException {
     // cheat here, just create a volume and bucket entry so that we can
     // create the keys, we put the same data for key and value since the
@@ -763,13 +691,13 @@ t
     writeClient.renameKey(keyArg, toKeyName);
   }
 
-  private OmKeyArgs createAndCommitKey(KeyManager keyManager, String 
volumeName,
+  private OmKeyArgs createAndCommitKey(String volumeName,
       String bucketName, String keyName, int numBlocks) throws IOException {
-    return createAndCommitKey(keyManager, volumeName, bucketName, keyName,
+    return createAndCommitKey(volumeName, bucketName, keyName,
         numBlocks, 0);
   }
 
-  private OmKeyArgs createAndCommitKey(KeyManager keyManager, String 
volumeName,
+  private OmKeyArgs createAndCommitKey(String volumeName,
       String bucketName, String keyName, int numBlocks, int numUncommitted)
       throws IOException {
     // Even if no key size is appointed, there will be at least one
@@ -818,4 +746,44 @@ t
     writeClient.commitKey(keyArg, session.getId());
     return keyArg;
   }
+
+  private long getDeletedKeyCount() {
+    final long count = keyDeletingService.getDeletedKeyCount().get();
+    LOG.debug("KeyDeletingService deleted keys: {}", count);
+    return count;
+  }
+
+  private long getRunCount() {
+    final long count = keyDeletingService.getRunCount().get();
+    LOG.debug("KeyDeletingService run count: {}", count);
+    return count;
+  }
+
+  private int countKeysPendingDeletion() {
+    try {
+      final int count = keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
+          .getKeyBlocksList().size();
+      LOG.debug("KeyManager keys pending deletion: {}", count);
+      return count;
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  private long countBlocksPendingDeletion() {
+    try {
+      return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
+          .getKeyBlocksList()
+          .stream()
+          .map(BlockGroup::getBlockIDList)
+          .mapToLong(Collection::size)
+          .sum();
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  private static String uniqueObjectName(String prefix) {
+    return prefix + OBJECT_COUNTER.getAndIncrement();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to