This is an automated email from the ASF dual-hosted git repository.

sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 956385a449 HDDS-8771. Refactor volume level tmp directory for generic 
usage. (#4838)
956385a449 is described below

commit 956385a4496a634bd3f5232e7201cc9c1de8db22
Author: Ethan Rose <[email protected]>
AuthorDate: Sun Jun 25 19:49:21 2023 -0700

    HDDS-8771. Refactor volume level tmp directory for generic usage. (#4838)
---
 .../apache/hadoop/ozone/HddsDatanodeService.java   |  22 ---
 .../states/endpoint/VersionEndpointTask.java       |  17 +-
 .../container/common/utils/StorageVolumeUtil.java  |  78 +++++---
 .../ozone/container/common/volume/HddsVolume.java  | 213 ++++++++++++--------
 .../container/common/volume/MutableVolumeSet.java  |  25 +--
 .../container/common/volume/StorageVolume.java     |  55 ++++--
 .../ozone/container/keyvalue/KeyValueHandler.java  |  18 +-
 .../keyvalue/helpers/KeyValueContainerUtil.java    | 180 +++--------------
 .../hadoop/ozone/TestHddsDatanodeService.java      |  20 +-
 .../ozone/container/common/ContainerTestUtils.java |   6 +-
 .../common/impl/TestContainerPersistence.java      | 220 ++++++++++++---------
 .../common/utils/TestStorageVolumeUtil.java        |   7 +-
 .../container/common/volume/TestHddsVolume.java    |  58 +++++-
 .../container/keyvalue/TestKeyValueHandler.java    |  15 +-
 .../ozone/container/common/TestEndPoint.java       |  14 +-
 15 files changed, 495 insertions(+), 453 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
index d838839a57..06a296047d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -63,7 +63,6 @@ import 
org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
-import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
 import org.apache.hadoop.ozone.util.OzoneNetUtils;
 import org.apache.hadoop.ozone.util.ShutdownHookManager;
 import org.apache.hadoop.security.SecurityUtil;
@@ -152,25 +151,6 @@ public class HddsDatanodeService extends GenericCli 
implements ServicePlugin {
     this.args = args != null ? Arrays.copyOf(args, args.length) : null;
   }
 
-  private void cleanTmpDir() {
-    if (datanodeStateMachine != null) {
-      MutableVolumeSet volumeSet =
-          datanodeStateMachine.getContainer().getVolumeSet();
-      for (StorageVolume volume : volumeSet.getVolumesList()) {
-        if (volume instanceof HddsVolume) {
-          HddsVolume hddsVolume = (HddsVolume) volume;
-          try {
-            KeyValueContainerUtil.ContainerDeleteDirectory
-                .cleanTmpDir(hddsVolume);
-          } catch (IOException ex) {
-            LOG.error("Error while cleaning tmp delete directory " +
-                "under {}", hddsVolume.getWorkingDir(), ex);
-          }
-        }
-      }
-    }
-  }
-
   public static void main(String[] args) {
     try {
       OzoneNetUtils.disableJvmNetworkAddressCacheIfRequired(
@@ -580,8 +560,6 @@ public class HddsDatanodeService extends GenericCli 
implements ServicePlugin {
   @Override
   public void stop() {
     if (!isStopped.getAndSet(true)) {
-      // Clean <HddsVolume>/tmp/container_delete_service dir.
-      cleanTmpDir();
       if (plugins != null) {
         for (ServicePlugin plugin : plugins) {
           try {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
index 2d53f2a92c..f9b0d882f0 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
@@ -24,10 +24,8 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.ozone.OzoneConsts;
 import 
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
 import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
-import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
-import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -124,20 +122,7 @@ public class VersionEndpointTask implements
         boolean result = StorageVolumeUtil.checkVolume(volume,
             scmId, clusterId, configuration, LOG,
             ozoneContainer.getDbVolumeSet());
-
-        if (result) {
-          // Clean <HddsVolume>/tmp/container_delete_service dir.
-          if (volume instanceof HddsVolume) {
-            HddsVolume hddsVolume = (HddsVolume) volume;
-            try {
-              KeyValueContainerUtil.ContainerDeleteDirectory
-                  .cleanTmpDir(hddsVolume);
-            } catch (IOException ex) {
-              LOG.error("Error while cleaning tmp delete directory " +
-                  "under {}", hddsVolume.getWorkingDir(), ex);
-            }
-          }
-        } else {
+        if (!result) {
           volumeSet.failVolume(volume.getStorageDir().getPath());
         }
       }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/StorageVolumeUtil.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/StorageVolumeUtil.java
index 0050038bb8..10b1f0ed15 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/StorageVolumeUtil.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/StorageVolumeUtil.java
@@ -182,9 +182,9 @@ public final class StorageVolumeUtil {
    * Prior to SCM HA, volumes used the format {@code <volume>/hdds/<scm-id>}.
    * Post SCM HA, new volumes will use the format {@code <volume>/hdds/<cluster
    * -id>}.
-   * Existing volumes using SCM ID would have been reformatted to have {@code
+   * Existing volumes using SCM ID will be reformatted to have {@code
    * <volume>/hdds/<cluster-id>} as a symlink pointing to {@code <volume
-   * >/hdds/<scm-id>}.
+   * >/hdds/<scm-id>} when SCM HA is finalized.
    *
    * @param volume
    * @param scmId
@@ -199,8 +199,9 @@ public final class StorageVolumeUtil {
       MutableVolumeSet dbVolumeSet) {
     File volumeRoot = volume.getStorageDir();
     String volumeRootPath = volumeRoot.getPath();
-    File clusterDir = new File(volumeRoot, clusterId);
+    File clusterIDDir = new File(volumeRoot, clusterId);
 
+    // Create the volume's version file if necessary.
     try {
       volume.format(clusterId);
     } catch (IOException ex) {
@@ -210,44 +211,73 @@ public final class StorageVolumeUtil {
     }
 
     File[] rootFiles = volumeRoot.listFiles();
+    // This will either be cluster ID or SCM ID, depending on if SCM HA is
+    // finalized yet or not.
+    String workingDirName = clusterId;
+    boolean success = true;
 
     if (rootFiles == null) {
       // This is the case for IOException, where listFiles returns null.
       // So, we fail the volume.
-      return false;
+      success = false;
     } else if (rootFiles.length == 1) {
-      // DN started for first time or this is a newly added volume.
       // The one file is the version file.
-      // So we create cluster ID directory, or SCM ID directory if
-      // pre-finalized for SCM HA.
+      // DN started for first time or this is a newly added volume.
       // Either the SCM ID or cluster ID will be used in naming the
-      // volume's subdirectory, depending on the datanode's layout version.
-      String id = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID(conf,
-          scmId, clusterId);
+      // volume's working directory, depending on the datanode's layout 
version.
+      workingDirName = VersionedDatanodeFeatures.ScmHA
+          .chooseContainerPathID(conf, scmId, clusterId);
       try {
-        volume.createWorkingDir(id, dbVolumeSet);
+        volume.createWorkingDir(workingDirName, dbVolumeSet);
       } catch (IOException e) {
         logger.error("Prepare working dir failed for volume {}.",
             volumeRootPath, e);
-        return false;
+        success = false;
       }
-      return true;
     } else if (rootFiles.length == 2) {
-      // If we are finalized for SCM HA and there is no cluster ID directory,
-      // the volume may have been unhealthy during finalization and been
-      // skipped. Create cluster ID symlink now.
-      // Else, We are still pre-finalized.
-      // The existing directory should be left for backwards compatibility.
-      return VersionedDatanodeFeatures.ScmHA.
+      // The two files are the version file and an existing working directory.
+      // If the working directory matches the cluster ID, we do not need to
+      // do extra steps.
+      // If the working directory matches the SCM ID and SCM HA has been
+      // finalized, the volume may have been unhealthy during finalization and
+      // been skipped. In that case create the cluster ID symlink now.
+      // If the working directory matches the SCM ID and SCM HA is not yet
+      // finalized, use that as the working directory.
+      success = VersionedDatanodeFeatures.ScmHA.
           upgradeVolumeIfNeeded(volume, clusterId);
+      try {
+        workingDirName = VersionedDatanodeFeatures.ScmHA
+            .chooseContainerPathID(volume, clusterId);
+      } catch (IOException ex) {
+        success = false;
+      }
     } else {
-      if (!clusterDir.exists()) {
+      // If there are more files in this directory, we only care that a
+      // working directory named after our cluster ID is present for us to use.
+      // Any existing SCM ID directory should be left for backwards
+      // compatibility.
+      if (clusterIDDir.exists()) {
+        workingDirName = clusterId;
+      } else {
         logger.error("Volume {} is in an inconsistent state. {} files found " +
-            "but cluster ID directory {} does not exist.", volumeRootPath,
-            rootFiles.length, clusterDir);
-        return false;
+                "but cluster ID directory {} does not exist.", volumeRootPath,
+            rootFiles.length, clusterIDDir);
+        success = false;
+      }
+    }
+
+    // Once the correct working directory name is identified, create the
+    // volume level tmp directories under it.
+    if (success) {
+      try {
+        volume.createTmpDirs(workingDirName);
+      } catch (IOException e) {
+        logger.error("Prepare tmp dir failed for volume {}.",
+            volumeRootPath, e);
+        success = false;
       }
-      return true;
     }
+
+    return success;
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
index b440e3fc43..bd840ba300 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
@@ -20,24 +20,29 @@ package org.apache.hadoop.ozone.container.common.volume;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.annotation.InterfaceStability;
 
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
 import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.hadoop.ozone.container.common.utils.DatanodeStoreCache;
 import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
 import org.apache.hadoop.ozone.container.common.utils.RawDB;
 import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
 import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
 import 
org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures.SchemaV3;
 import org.slf4j.Logger;
@@ -75,9 +80,8 @@ public class HddsVolume extends StorageVolume {
   private static final Logger LOG = LoggerFactory.getLogger(HddsVolume.class);
 
   public static final String HDDS_VOLUME_DIR = "hdds";
-  private static final Path TMP_DIR = Paths.get("tmp");
-  private static final Path TMP_DELETE_SERVICE_DIR =
-      Paths.get("container_delete_service");
+  public static final String TMP_CONTAINER_DELETE_DIR_NAME =
+      "deleted-containers";
 
   private final VolumeIOStats volumeIOStats;
   private final VolumeInfoMetrics volumeInfoMetrics;
@@ -93,8 +97,7 @@ public class HddsVolume extends StorageVolume {
   // container db path. This is initialized only once together with dbVolume,
   // and stored as a member to prevent spawning lots of File objects.
   private File dbParentDir;
-  private Path tmpDirPath;
-  private Path deleteServiceDirPath;
+  private File deletedContainerDir;
   private AtomicBoolean dbLoaded = new AtomicBoolean(false);
 
   /**
@@ -144,11 +147,9 @@ public class HddsVolume extends StorageVolume {
   }
 
   @Override
-  public void createWorkingDir(String workingDirName,
-      MutableVolumeSet dbVolumeSet) throws IOException {
-    super.createWorkingDir(workingDirName, dbVolumeSet);
-
-    createTmpDir();
+  public void createWorkingDir(String dirName, MutableVolumeSet dbVolumeSet)
+      throws IOException {
+    super.createWorkingDir(dirName, dbVolumeSet);
 
     // Create DB store for a newly formatted volume
     if (VersionedDatanodeFeatures.isFinalized(
@@ -157,6 +158,14 @@ public class HddsVolume extends StorageVolume {
     }
   }
 
+  @Override
+  public void createTmpDirs(String workDirName) throws IOException {
+    super.createTmpDirs(workDirName);
+    deletedContainerDir =
+        createTmpSubdirIfNeeded(TMP_CONTAINER_DELETE_DIR_NAME);
+    cleanDeletedContainerDir();
+  }
+
   public File getHddsRootDir() {
     return super.getStorageDir();
   }
@@ -192,6 +201,112 @@ public class HddsVolume extends StorageVolume {
       volumeInfoMetrics.unregister();
     }
     closeDbStore();
+    cleanDeletedContainerDir();
+  }
+
+  /**
+   * Delete all files under
+   * <volume>/hdds/<cluster-id>/tmp/deleted-containers.
+   * This is the directory where containers are moved when they are deleted
+   * from the system, but before being removed from the filesystem. This
+   * makes the deletion atomic.
+   */
+  public void cleanDeletedContainerDir() {
+    // If the volume was shut down before initialization completed, skip
+    // emptying the directory.
+    if (deletedContainerDir == null) {
+      return;
+    }
+
+    if (!deletedContainerDir.exists()) {
+      LOG.warn("Unable to clear deleted containers from {}. Directory does " +
+          "not exist.", deletedContainerDir);
+      return;
+    }
+
+    if (!deletedContainerDir.isDirectory()) {
+      LOG.warn("Unable to clear deleted containers from {}. Location is not a" 
+
+          " directory", deletedContainerDir);
+      return;
+    }
+
+    File[] containerDirs = deletedContainerDir.listFiles(File::isDirectory);
+    if (containerDirs == null) {
+      // Either directory does not exist or IO error. Either way we cannot
+      // proceed with deletion.
+      LOG.warn("Failed to clear container delete directory {}. Directory " +
+          "could not be accessed.", deletedContainerDir);
+      return;
+    }
+
+    for (File containerDir: containerDirs) {
+      // Check the case where we have Schema V3 and
+      // removing the container's entries from RocksDB fails.
+      // --------------------------------------------
+      // On datanode restart, we populate the container set
+      // based on the available datanode volumes and
+      // populate the container metadata based on the values in RocksDB.
+      // The container is in the tmp directory,
+      // so it won't be loaded in the container set
+      // but there will be orphaned entries in the volume's RocksDB.
+      // --------------------------------------------
+      // For every .container file we find under /tmp,
+      // we use it to get the RocksDB entries and delete them.
+      // If the .container file doesn't exist then the contents of the
+      // directory are probably leftovers of a failed delete and
+      // the RocksDB entries must have already been removed.
+      // In any case we can proceed with deleting the directory's contents.
+      // --------------------------------------------
+      // Get container file and check Schema version. If Schema is V3
+      // then remove the container from RocksDB.
+      File containerFile = ContainerUtils.getContainerFile(containerDir);
+
+      if (containerFile.exists()) {
+        KeyValueContainerData keyValueContainerData;
+        try {
+          ContainerData containerData =
+              ContainerDataYaml.readContainerFile(containerFile);
+          keyValueContainerData = (KeyValueContainerData) containerData;
+        } catch (IOException ex) {
+          LOG.warn("Failed to read container file {}. Container cannot be " +
+              "removed from the delete directory.", containerFile, ex);
+          continue;
+        }
+
+        if (keyValueContainerData.hasSchema(OzoneConsts.SCHEMA_V3)) {
+          // Container file doesn't include the volume
+          // so we need to set it here in order to get the DB location.
+          keyValueContainerData.setVolume(this);
+          File dbFile = KeyValueContainerLocationUtil
+              .getContainerDBFile(keyValueContainerData);
+          keyValueContainerData.setDbFile(dbFile);
+          try {
+            // Remove container from Rocks DB
+            BlockUtils.removeContainerFromDB(keyValueContainerData, getConf());
+          } catch (IOException ex) {
+            LOG.warn("Failed to remove container data from DB while" +
+                    "deleting container {}. Container cannot be removed from" +
+                    "the delete directory {}.",
+                keyValueContainerData.getContainerID(),
+                deletedContainerDir, ex);
+            continue;
+          }
+        }
+      }
+
+      // If the container file was already deleted, the RocksDB entries were
+      // cleared.
+      try {
+        if (containerDir.isDirectory()) {
+          FileUtils.deleteDirectory(containerDir);
+        } else {
+          FileUtils.delete(containerDir);
+        }
+      } catch (IOException ex) {
+        LOG.warn("Failed to remove container directory {}.",
+            deletedContainerDir, ex);
+      }
+    }
   }
 
   @Override
@@ -244,12 +359,8 @@ public class HddsVolume extends StorageVolume {
     return this.dbParentDir;
   }
 
-  public Path getTmpDirPath() {
-    return this.tmpDirPath;
-  }
-
-  public Path getDeleteServiceDirPath() {
-    return this.deleteServiceDirPath;
+  public File getDeletedContainerDir() {
+    return this.deletedContainerDir;
   }
 
   public boolean isDbLoaded() {
@@ -312,19 +423,19 @@ public class HddsVolume extends StorageVolume {
   public void createDbStore(MutableVolumeSet dbVolumeSet) throws IOException {
     DbVolume chosenDbVolume = null;
     File clusterIdDir;
-    String workingDir = getWorkingDir() == null ? getClusterID() :
-        getWorkingDir();
+    String workingDirName = getWorkingDirName() == null ? getClusterID() :
+        getWorkingDirName();
 
     if (dbVolumeSet == null || dbVolumeSet.getVolumesList().isEmpty()) {
       // No extra db volumes specified, just create db under the HddsVolume.
-      clusterIdDir = new File(getStorageDir(), workingDir);
+      clusterIdDir = new File(getStorageDir(), workingDirName);
     } else {
       // Randomly choose a DbVolume for simplicity.
       List<DbVolume> dbVolumeList = StorageVolumeUtil.getDbVolumesList(
           dbVolumeSet.getVolumesList());
       chosenDbVolume = dbVolumeList.get(
           ThreadLocalRandom.current().nextInt(dbVolumeList.size()));
-      clusterIdDir = new File(chosenDbVolume.getStorageDir(), workingDir);
+      clusterIdDir = new File(chosenDbVolume.getStorageDir(), workingDirName);
     }
 
     if (!clusterIdDir.exists()) {
@@ -369,62 +480,6 @@ public class HddsVolume extends StorageVolume {
     }
   }
 
-  private String getIdDir() throws IOException {
-    try {
-      return VersionedDatanodeFeatures.ScmHA.chooseContainerPathID(
-          this, getClusterID() == null ? "" : getClusterID());
-    } catch (IOException ex) {
-      String errMsg = "Cannot resolve volume container path " +
-          "ID dir for volume {} " + getStorageID() +
-          " when creating volume tmp dir";
-      LOG.error(errMsg, ex);
-      throw new IOException(errMsg, ex);
-    }
-  }
-
-  private Path createTmpPath() throws IOException {
-
-    // HddsVolume root directory path
-    String hddsRoot = getHddsRootDir().toString();
-
-    // HddsVolume path
-    String vol = HddsVolumeUtil.getHddsRoot(hddsRoot);
-
-    Path volPath = Paths.get(vol);
-    Path idPath = Paths.get(getIdDir());
-
-    return volPath.resolve(idPath).resolve(TMP_DIR);
-  }
-
-  private void createTmpDir() throws IOException {
-    tmpDirPath = createTmpPath();
-
-    if (Files.notExists(tmpDirPath)) {
-      try {
-        Files.createDirectories(tmpDirPath);
-      } catch (IOException ex) {
-        LOG.error("Error creating {}", tmpDirPath.toString(), ex);
-      }
-    }
-  }
-
-  public void createDeleteServiceDir() throws IOException {
-    if (tmpDirPath == null) {
-      createTmpDir();
-    }
-
-    deleteServiceDirPath =
-        tmpDirPath.resolve(TMP_DELETE_SERVICE_DIR);
-
-    if (Files.notExists(deleteServiceDirPath)) {
-      try {
-        Files.createDirectories(deleteServiceDirPath);
-      } catch (IOException ex) {
-        LOG.error("Error creating {}", deleteServiceDirPath.toString(), ex);
-      }
-    }
-  }
-
   private void closeDbStore() {
     if (!dbLoaded.get()) {
       return;
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java
index a78735286a..02f4c6f101 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java
@@ -199,10 +199,7 @@ public class MutableVolumeSet implements VolumeSet {
     checkAllVolumes();
 
     // Ensure volume threads are stopped and scm df is saved during shutdown.
-    shutdownHook = () -> {
-      saveVolumeSetUsed();
-    };
-    ShutdownHookManager.get().addShutdownHook(shutdownHook,
+    ShutdownHookManager.get().addShutdownHook(this::shutdown,
         SHUTDOWN_HOOK_PRIORITY);
   }
 
@@ -416,29 +413,19 @@ public class MutableVolumeSet implements VolumeSet {
   }
 
   /**
-   * This method, call shutdown on each volume to shutdown volume usage
-   * thread and write scmUsed on each volume.
+   * Shutdown the volumeset.
    */
-
-  private synchronized void saveVolumeSetUsed() {
-    for (StorageVolume hddsVolume : volumeMap.values()) {
+  public void shutdown() {
+    for (StorageVolume volume : volumeMap.values()) {
       try {
-        hddsVolume.shutdown();
+        volume.shutdown();
       } catch (Exception ex) {
-        LOG.error("Failed to shutdown volume : " + hddsVolume.getStorageDir(),
-            ex);
+        LOG.error("Failed to shutdown volume : " + volume.getStorageDir(), ex);
       }
     }
     volumeMap.clear();
   }
 
-  /**
-   * Shutdown the volumeset.
-   */
-  public void shutdown() {
-    saveVolumeSetUsed();
-  }
-
   @Override
   @VisibleForTesting
   public List<StorageVolume> getVolumesList() {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
index 1b514056a4..bd79c740bd 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
@@ -63,6 +64,9 @@ public abstract class StorageVolume
   private static final Logger LOG =
       LoggerFactory.getLogger(StorageVolume.class);
 
+  // The name of the directory used for temporary files on the volume.
+  public static final String TMP_DIR_NAME = "tmp";
+
   /**
    * Type for StorageVolume.
    */
@@ -105,12 +109,13 @@ public abstract class StorageVolume
   private ConfigurationSource conf;
 
   private final File storageDir;
+  private String workingDirName;
+  private File tmpDir;
 
   private final Optional<VolumeInfo> volumeInfo;
 
   private final VolumeSet volumeSet;
 
-  private String workingDir;
 
   protected StorageVolume(Builder<?> b) throws IOException {
     if (!b.failedVolume) {
@@ -180,19 +185,43 @@ public abstract class StorageVolume
   }
 
   /**
-   * Create working directory for cluster io loads.
-   * @param workingDirName scmID or clusterID according to SCM HA config
-   * @param dbVolumeSet optional dbVolumes
+   * Create the working directory for the volume at
+   * <volume>/<hdds>/<workingDirName>.
+   * Creates necessary subdirectories of the working directory as well. This
+   * includes the tmp directory at <volume>/<hdds>/<workingDirName>/tmp.
+   * Child classes may override this method to add volume specific
+   * subdirectories, but they should call the parent method first to make
+   * sure initial directories are constructed.
+   *
+   * @param dirName scmID or clusterID according to SCM HA
+   *    layout feature upgrade finalization status.
    * @throws IOException
    */
-  public void createWorkingDir(String workingDirName,
-      MutableVolumeSet dbVolumeSet) throws IOException {
-    File idDir = new File(getStorageDir(), workingDirName);
-    if (!idDir.mkdir()) {
+  public void createWorkingDir(String dirName, MutableVolumeSet dbVolumeSet)
+      throws IOException {
+    File idDir = new File(getStorageDir(), dirName);
+    if (!idDir.exists() && !idDir.mkdir()) {
       throw new IOException("Unable to create ID directory " + idDir +
           " for datanode.");
     }
-    this.workingDir = workingDirName;
+    this.workingDirName = dirName;
+  }
+
+  public void createTmpDirs(String workDirName) throws IOException {
+    this.tmpDir =
+        new File(new File(getStorageDir(), workDirName), TMP_DIR_NAME);
+    Files.createDirectories(tmpDir.toPath());
+  }
+
+  /**
+   * Create a subdirectory within this volume's tmp directory.
+   * This subdirectory can be used as a work space for temporary filesystem
+   * operations before they are moved to their final destination.
+   */
+  protected final File createTmpSubdirIfNeeded(String name) throws IOException 
{
+    File newDir = new File(tmpDir, name);
+    Files.createDirectories(newDir.toPath());
+    return newDir;
   }
 
   private VolumeState analyzeVolumeState() {
@@ -392,8 +421,12 @@ public abstract class StorageVolume
     return this.storageDir;
   }
 
-  public String getWorkingDir() {
-    return this.workingDir;
+  public String getWorkingDirName() {
+    return this.workingDirName;
+  }
+
+  public File getTmpDir() {
+    return this.tmpDir;
   }
 
   public void refreshVolumeInfo() {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 18b415bceb..5bfb77a4bc 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -1324,13 +1324,13 @@ public class KeyValueHandler extends Handler {
         HddsVolume hddsVolume = keyValueContainerData.getVolume();
 
         // Rename container location
-        boolean success = KeyValueContainerUtil.ContainerDeleteDirectory
-            .moveToTmpDeleteDirectory(keyValueContainerData, hddsVolume);
-
-        if (!success) {
-          LOG.error("Failed to move container under " +
-              hddsVolume.getDeleteServiceDirPath());
-          throw new StorageContainerException("Moving container failed",
+        try {
+          KeyValueContainerUtil.moveToDeletedContainerDir(
+              keyValueContainerData, hddsVolume);
+        } catch (IOException ex) {
+          LOG.error("Failed to move deleted container under {}",
+              hddsVolume.getDeletedContainerDir(), ex);
+          throw new StorageContainerException("Moving deleted container 
failed",
               CONTAINER_INTERNAL_ERROR);
         }
 
@@ -1339,8 +1339,8 @@ public class KeyValueHandler extends Handler {
               .getContainerPath();
           File containerDir = new File(containerPath);
 
-          LOG.debug("Container {} has been successfuly moved under {}",
-              containerDir.getName(), hddsVolume.getDeleteServiceDirPath());
+          LOG.debug("Container {} has been successfully moved under {}",
+              containerDir.getName(), hddsVolume.getDeletedContainerDir());
         }
       }
       long containerId = container.getContainerData().getContainerID();
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
index 547e0144a9..1b714dacbf 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
@@ -23,10 +23,7 @@ import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
-import java.util.ListIterator;
 
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 
@@ -37,14 +34,11 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
 import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.hadoop.ozone.container.common.utils.ContainerInspectorUtil;
 import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 
 import com.google.common.base.Preconditions;
@@ -163,7 +157,7 @@ public final class KeyValueContainerUtil {
       } catch (IOException ex) {
         LOG.error("DB failure, unable to remove container. " +
             "Disk need to be replaced.", ex);
-        throw new IOException(ex);
+        throw ex;
       }
     } else {
       // Close the DB connection and remove the DB handler from cache
@@ -470,8 +464,9 @@ public final class KeyValueContainerUtil {
   }
 
   /**
-   * Utilities for container_delete_service directory
-   * which is located under <volume>/hdds/<cluster-id>/tmp/.
+   * Moves container directory to a new location
+   * under "<volume>/hdds/<cluster-id>/tmp/deleted-containers"
+   * and updates metadata and chunks path.
    * Containers will be moved under it before getting deleted
    * to avoid, in case of failure, having artifact leftovers
    * on the default container path on the disk.
@@ -486,148 +481,37 @@ public final class KeyValueContainerUtil {
    * 2. Container is removed from in memory container set.
    * 3. Container's entries are removed from RocksDB.
    * 4. Container is deleted from tmp directory.
+   *
+   * @param keyValueContainerData
+   * @return true if renaming was successful
    */
-  public static class ContainerDeleteDirectory {
-
-    /**
-     * Delete all files under
-     * <volume>/hdds/<cluster-id>/tmp/container_delete_service.
-     */
-    public static synchronized void cleanTmpDir(HddsVolume hddsVolume)
-        throws IOException {
-      if (hddsVolume.getStorageState() != StorageVolume.VolumeState.NORMAL) {
-        LOG.debug("Call to clean tmp dir container_delete_service directory "
-                + "for {} while VolumeState {}",
-            hddsVolume.getStorageDir(),
-            hddsVolume.getStorageState().toString());
-        return;
-      }
-
-      // getDeleteLeftovers initializes tmp and
-      // delete service directories as well
-      ListIterator<File> leftoversListIt = getDeleteLeftovers(hddsVolume);
-
-      while (leftoversListIt.hasNext()) {
-        File file = leftoversListIt.next();
-
-        // Check the case where we have Schema V3 and
-        // removing the container's entries from RocksDB fails.
-        // --------------------------------------------
-        // On datanode restart, we populate the container set
-        // based on the available datanode volumes and
-        // populate the container metadata based on the values in RocksDB.
-        // The container is in the tmp directory,
-        // so it won't be loaded in the container set
-        // but there will be orphaned entries in the volume's RocksDB.
-        // --------------------------------------------
-        // For every .container file we find under /tmp,
-        // we use it to get the RocksDB entries and delete them.
-        // If the .container file doesn't exist then the contents of the
-        // directory are probably leftovers of a failed delete and
-        // the RocksDB entries must have already been removed.
-        // In any case we can proceed with deleting the directory's contents.
-        // --------------------------------------------
-        // Get container file and check Schema version. If Schema is V3
-        // then remove the container from RocksDB.
-        File containerFile = ContainerUtils.getContainerFile(file);
-        
-        ContainerData containerData = ContainerDataYaml
-            .readContainerFile(containerFile);
-        KeyValueContainerData keyValueContainerData =
-            (KeyValueContainerData) containerData;
-
-        if (keyValueContainerData.hasSchema(OzoneConsts.SCHEMA_V3)) {
-          // Container file doesn't include the volume
-          // so we need to set it here in order to get the db file.
-          keyValueContainerData.setVolume(hddsVolume);
-          File dbFile = KeyValueContainerLocationUtil
-              .getContainerDBFile(keyValueContainerData);
-          keyValueContainerData.setDbFile(dbFile);
-          try {
-            // Remove container from Rocks DB
-            BlockUtils.removeContainerFromDB(keyValueContainerData,
-                hddsVolume.getConf());
-          } catch (IOException ex) {
-            LOG.error("Failed to remove container from Rocks DB", ex);
-          }
-        }
-
-        try {
-          if (file.isDirectory()) {
-            FileUtils.deleteDirectory(file);
-          } else {
-            FileUtils.delete(file);
-          }
-        } catch (IOException ex) {
-          LOG.error("Failed to delete directory or file inside " +
-              "{}", hddsVolume.getDeleteServiceDirPath().toString(), ex);
-        }
-      }
-    }
-
-    /**
-     * In the future might be used to gather metrics
-     * for the files left under
-     * <volume>/hdds/<cluster-id>/tmp/container_delete_service.
-     * @return ListIterator to all the files
-     */
-    public static ListIterator<File> getDeleteLeftovers(HddsVolume hddsVolume)
-        throws IOException {
-      List<File> leftovers = new ArrayList<>();
-
-      // Initialize tmp and delete service directories
-      hddsVolume.createDeleteServiceDir();
-
-      File tmpDir = hddsVolume.getDeleteServiceDirPath().toFile();
-
-      if (tmpDir.exists() && tmpDir.isDirectory()) {
-        File[] tmpFiles = tmpDir.listFiles();
-        if (tmpFiles != null) {
-          leftovers.addAll(Arrays.asList(tmpFiles));
-        }
-      }
-
-      return leftovers.listIterator();
+  public static void moveToDeletedContainerDir(
+      KeyValueContainerData keyValueContainerData,
+      HddsVolume hddsVolume) throws IOException {
+    String containerPath = keyValueContainerData.getContainerPath();
+    File container = new File(containerPath);
+    String containerDirName = container.getName();
+
+    Path destinationDirPath = hddsVolume.getDeletedContainerDir().toPath()
+        .resolve(Paths.get(containerDirName));
+    File destinationDirFile = destinationDirPath.toFile();
+
+    // If a container by the same name was moved to the delete directory but
+    // the final delete failed, clear it out before adding another container
+    // with the same name.
+    if (destinationDirFile.exists()) {
+      FileUtils.deleteDirectory(destinationDirFile);
     }
 
-    /**
-     * Renaming container directory path to a new location
-     * under "<volume>/hdds/<cluster-id>/tmp/container_delete_service"
-     * and updating metadata and chunks path.
-     * @param keyValueContainerData
-     * @return true if renaming was successful
-     */
-    public static boolean moveToTmpDeleteDirectory(
-        KeyValueContainerData keyValueContainerData,
-        HddsVolume hddsVolume) {
-      String containerPath = keyValueContainerData.getContainerPath();
-      File container = new File(containerPath);
-      String containerDirName = container.getName();
-
-      // Initialize delete directory
-      try {
-        hddsVolume.createDeleteServiceDir();
-      } catch (IOException ex) {
-        LOG.error("Error initializing container-service tmp dir");
-        return false;
-      }
-
-      String destinationDirPath = hddsVolume.getDeleteServiceDirPath()
-          .resolve(Paths.get(containerDirName)).toString();
-
-      boolean success = container.renameTo(new File(destinationDirPath));
+    Files.move(container.toPath(), destinationDirPath);
 
-      // Updating in memory values of the container's location
-      // which is used by KeyValueContainerUtil#removeContainer().
-      // In case of a datanode restart these values won't persist but
-      // tmp delete directory will be wiped, so this won't be an issue.
-      if (success) {
-        keyValueContainerData.setMetadataPath(destinationDirPath +
-            OZONE_URI_DELIMITER + OzoneConsts.CONTAINER_META_PATH);
-        keyValueContainerData.setChunksPath(destinationDirPath +
-            OZONE_URI_DELIMITER + OzoneConsts.STORAGE_DIR_CHUNKS);
-      }
-      return success;
-    }
+    // Updating in memory values of the container's location
+    // which is used by KeyValueContainerUtil#removeContainer().
+    // In case of a datanode restart these values won't persist but
+    // tmp delete directory will be wiped, so this won't be an issue.
+    keyValueContainerData.setMetadataPath(destinationDirPath +
+        OZONE_URI_DELIMITER + OzoneConsts.CONTAINER_META_PATH);
+    keyValueContainerData.setChunksPath(destinationDirPath +
+        OZONE_URI_DELIMITER + OzoneConsts.STORAGE_DIR_CHUNKS);
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/TestHddsDatanodeService.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/TestHddsDatanodeService.java
index 87dc68383b..01188b6799 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/TestHddsDatanodeService.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/TestHddsDatanodeService.java
@@ -21,7 +21,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.ListIterator;
 import java.util.UUID;
 
 import org.apache.hadoop.fs.FileUtil;
@@ -38,7 +37,6 @@ import 
org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
-import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.hadoop.util.ServicePlugin;
 
@@ -132,7 +130,8 @@ public class TestHddsDatanodeService {
   @ParameterizedTest
   @ValueSource(strings = {OzoneConsts.SCHEMA_V1,
       OzoneConsts.SCHEMA_V2, OzoneConsts.SCHEMA_V3})
-  public void testTmpDirOnShutdown(String schemaVersion) throws IOException {
+  public void testDeletedContainersClearedOnShutdown(String schemaVersion)
+      throws IOException {
     ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
     LOG.info("SchemaV3_enabled: " +
         conf.get(DatanodeConfiguration.CONTAINER_SCHEMA_V3_ENABLED));
@@ -155,20 +154,23 @@ public class TestHddsDatanodeService {
         clusterId, conf, LOG, null);
     // Create a container and move it under the tmp delete dir.
     KeyValueContainer container = ContainerTestUtils
-        .setUpTestContainerUnderTmpDir(
+        .addContainerToDeletedDir(
             hddsVolume, clusterId, conf, schemaVersion);
     assertTrue(container.getContainerFile().exists());
     assertTrue(container.getContainerDBFile().exists());
+    File[] deletedContainersAfterShutdown =
+        hddsVolume.getDeletedContainerDir().listFiles();
+    assertNotNull(deletedContainersAfterShutdown);
+    assertEquals(1, deletedContainersAfterShutdown.length);
 
     service.stop();
     service.join();
     service.close();
 
-    ListIterator<File> deleteLeftoverIt = KeyValueContainerUtil
-        .ContainerDeleteDirectory.getDeleteLeftovers(hddsVolume);
-    assertFalse(deleteLeftoverIt.hasNext());
-
-    volumeSet.shutdown();
+    deletedContainersAfterShutdown =
+        hddsVolume.getDeletedContainerDir().listFiles();
+    assertNotNull(deletedContainersAfterShutdown);
+    assertEquals(0, deletedContainersAfterShutdown.length);
   }
 
   static class MockService implements ServicePlugin {
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
index 3e0ddcd67f..71310e3032 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
@@ -184,7 +184,7 @@ public final class ContainerTestUtils {
     when(c.getContainerData().getVolume()).thenReturn(vol);
   }
 
-  public static KeyValueContainer setUpTestContainerUnderTmpDir(
+  public static KeyValueContainer addContainerToDeletedDir(
       HddsVolume volume, String clusterId,
       OzoneConfiguration conf, String schemaVersion)
       throws IOException {
@@ -209,8 +209,8 @@ public final class ContainerTestUtils {
     // For testing, we are moving the container
     // under the tmp directory, in order to delete
     // it from there, during datanode startup or shutdown
-    KeyValueContainerUtil.ContainerDeleteDirectory
-        .moveToTmpDeleteDirectory(keyValueContainerData, volume);
+    KeyValueContainerUtil
+        .moveToDeletedContainerDir(keyValueContainerData, volume);
 
     return container;
   }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index e0cebfb812..8be60132be 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -24,17 +24,18 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
-import java.util.ListIterator;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
@@ -56,6 +57,7 @@ import 
org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
 import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import 
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
@@ -84,7 +86,6 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 
-import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded;
 import static 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil.isSameSchemaVersion;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeFalse;
@@ -164,7 +165,13 @@ public class TestContainerPersistence {
     containerSet = new ContainerSet(1000);
     volumeSet = new MutableVolumeSet(DATANODE_UUID, conf, null,
         StorageVolume.VolumeType.DATA_VOLUME, null);
-    createDbInstancesForTestIfNeeded(volumeSet, SCM_ID, SCM_ID, conf);
+    // Initialize volume directories.
+    for (HddsVolume volume : StorageVolumeUtil.getHddsVolumesList(
+        volumeSet.getVolumesList())) {
+      boolean success = StorageVolumeUtil.checkVolume(volume, SCM_ID, SCM_ID,
+          conf, null, null);
+      Assert.assertTrue(success);
+    }
     blockManager = new BlockManagerImpl(conf);
     chunkManager = ChunkManagerFactory.createChunkManager(conf, blockManager,
         null);
@@ -336,39 +343,81 @@ public class TestContainerPersistence {
     long testContainerID = getTestContainerID();
     Container<KeyValueContainerData> container = addContainer(containerSet,
         testContainerID);
-    BlockID blockID = ContainerTestHelper.getTestBlockID(testContainerID);
-    ChunkInfo info = writeChunkHelper(blockID);
-    BlockData blockData = new BlockData(blockID);
-    List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
-    chunkList.add(info.getProtoBufMessage());
-    blockData.setChunks(chunkList);
-
-    blockManager.putBlock(container, blockData);
+    BlockID blockID = addBlockToContainer(container);
     container.close();
     Assert.assertTrue(containerSet.getContainerMapCopy()
         .containsKey(testContainerID));
     KeyValueContainerData containerData = container.getContainerData();
 
+    // Block data and metadata tables should have data.
+    assertContainerInSchema3DB(containerData, blockID);
+
+    container.delete();
+    containerSet.removeContainer(testContainerID);
+    Assert.assertFalse(containerSet.getContainerMapCopy()
+        .containsKey(testContainerID));
+
+    // Block data and metadata tables should be cleared.
+    assertContainerNotInSchema3DB(containerData, blockID);
+  }
+
+  private void assertContainerInSchema3DB(KeyValueContainerData containerData,
+      BlockID testBlock) throws IOException {
+    // This test is only valid for schema v3. Earlier schemas will have their
+    // own RocksDB that is deleted with the container.
+    if (!containerData.hasSchema(OzoneConsts.SCHEMA_V3)) {
+      return;
+    }
     try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf)) {
       DatanodeStoreSchemaThreeImpl store = (DatanodeStoreSchemaThreeImpl)
           dbHandle.getStore();
       Table<String, BlockData> blockTable = store.getBlockDataTable();
+      Table<String, Long> metadataTable = store.getMetadataTable();
 
-      // Key should exist in Block table.
+      // Block data and metadata tables should have data.
       Assertions.assertNotNull(blockTable
-          .getIfExist(containerData.getBlockKey(blockID.getLocalID())));
+          .getIfExist(containerData.getBlockKey(testBlock.getLocalID())));
+      Assertions.assertNotNull(metadataTable
+          .getIfExist(containerData.getBlockCountKey()));
+    }
+  }
 
-      container.delete();
-      containerSet.removeContainer(testContainerID);
-      Assert.assertFalse(containerSet.getContainerMapCopy()
-          .containsKey(testContainerID));
+  private void assertContainerNotInSchema3DB(
+      KeyValueContainerData containerData, BlockID testBlock)
+      throws IOException {
+    // This test is only valid for schema v3. Earlier schemas will have their
+    // own RocksDB that is deleted with the container.
+    if (!containerData.hasSchema(OzoneConsts.SCHEMA_V3)) {
+      return;
+    }
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf)) {
+      DatanodeStoreSchemaThreeImpl store = (DatanodeStoreSchemaThreeImpl)
+          dbHandle.getStore();
+      Table<String, BlockData> blockTable = store.getBlockDataTable();
+      Table<String, Long> metadataTable = store.getMetadataTable();
 
-      // Key should not exist in Block table, after container gets deleted.
+      // Block data and metadata tables should have data.
       Assertions.assertNull(blockTable
-          .getIfExist(containerData.getBlockKey(blockID.getLocalID())));
+          .getIfExist(containerData.getBlockKey(testBlock.getLocalID())));
+      Assertions.assertNull(metadataTable
+          .getIfExist(containerData.getBlockCountKey()));
     }
   }
 
+  private BlockID addBlockToContainer(
+      Container<KeyValueContainerData> container) throws IOException {
+    BlockID blockID = ContainerTestHelper.getTestBlockID(
+            container.getContainerData().getContainerID());
+    ChunkInfo info = writeChunkHelper(blockID);
+    BlockData blockData = new BlockData(blockID);
+    List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
+    chunkList.add(info.getProtoBufMessage());
+    blockData.setChunks(chunkList);
+    blockManager.putBlock(container, blockData);
+
+    return blockID;
+  }
+
   /**
    * If SchemaV3 is enabled, HddsVolume has already been
    * formatted and initialized in
@@ -378,97 +427,75 @@ public class TestContainerPersistence {
   @Test
   public void testDeleteContainerWithRenaming()
       throws Exception {
-    HddsVolume hddsVolume;
-    // If !SchemaV3, build hddsVolume
-    if (!isSameSchemaVersion(schemaVersion, OzoneConsts.SCHEMA_V3)) {
-      Files.createDirectories(Paths.get(hddsPath));
-
-      HddsVolume.Builder volumeBuilder =
-          new HddsVolume.Builder(hddsPath)
-          .datanodeUuid(DATANODE_UUID)
-          .conf(conf)
-          .usageCheckFactory(MockSpaceUsageCheckFactory.NONE);
-
-      hddsVolume = volumeBuilder.build();
-
-      hddsVolume.format(SCM_ID);
-      hddsVolume.createWorkingDir(SCM_ID, null);
-    }
-
+    // Set up container 1.
     long testContainerID1 = getTestContainerID();
-    Thread.sleep(100);
-    long testContainerID2 = getTestContainerID();
-
     Container<KeyValueContainerData> container1 =
         addContainer(containerSet, testContainerID1);
+    BlockID container1Block = addBlockToContainer(container1);
     container1.close();
+    KeyValueContainerData container1Data = container1.getContainerData();
+    assertContainerInSchema3DB(container1Data, container1Block);
 
+    // Set up container 2.
+    long testContainerID2 = getTestContainerID();
     Container<KeyValueContainerData> container2 =
         addContainer(containerSet, testContainerID2);
+    BlockID container2Block = addBlockToContainer(container2);
     container2.close();
+    KeyValueContainerData container2Data = container2.getContainerData();
+    assertContainerInSchema3DB(container2Data, container2Block);
 
     Assert.assertTrue(containerSet.getContainerMapCopy()
         .containsKey(testContainerID1));
     Assert.assertTrue(containerSet.getContainerMapCopy()
         .containsKey(testContainerID2));
 
-    KeyValueContainerData container1Data = container1.getContainerData();
-
-    KeyValueContainerData container2Data = container2.getContainerData();
-
-    hddsVolume = container1Data.getVolume();
-
-    // Rename container1 dir
-    Assert.assertTrue(KeyValueContainerUtil.ContainerDeleteDirectory
-        .moveToTmpDeleteDirectory(container1Data, hddsVolume));
-
-    // Rename container2 dir
-    Assert.assertTrue(KeyValueContainerUtil.ContainerDeleteDirectory
-        .moveToTmpDeleteDirectory(container2Data, hddsVolume));
-
-    File container1File =
-        new File(container1Data.getContainerPath());
-
-    File container2File =
-        new File(container2Data.getContainerPath());
-
-    ListIterator<File> tmpDirIter = KeyValueContainerUtil
-        .ContainerDeleteDirectory.getDeleteLeftovers(hddsVolume);
-    List<File> tmpDirFileList = new LinkedList<>();
-    boolean container1ExistsUnderTmpDir = false;
-    boolean container2ExistsUnderTmpDir = false;
-
-    while (tmpDirIter.hasNext()) {
-      tmpDirFileList.add(tmpDirIter.next());
-    }
-
-    if (tmpDirFileList.contains(container1File)) {
-      container1ExistsUnderTmpDir = true;
-    }
-
-    if (tmpDirFileList.contains(container2File)) {
-      container2ExistsUnderTmpDir = true;
-    }
-
-    Assert.assertTrue(container1ExistsUnderTmpDir);
-    Assert.assertTrue(container2ExistsUnderTmpDir);
-
-    // Delete container1
+    // Since this test only uses one volume, both containers will reside in
+    // the same volume.
+    HddsVolume hddsVolume = container1Data.getVolume();
+    // Volume setup should have created the tmp directory for container
+    // deletion.
+    File volumeTmpDir = hddsVolume.getTmpDir();
+    Assert.assertTrue(String.format("Volume level tmp dir %s not created.",
+            volumeTmpDir), volumeTmpDir.exists());
+    File deletedContainerDir = hddsVolume.getDeletedContainerDir();
+    Assert.assertTrue(String.format("Volume level container deleted directory" 
+
+        " %s not created.", deletedContainerDir), 
deletedContainerDir.exists());
+
+    // Move containers to delete directory. RocksDB should not yet be updated.
+    KeyValueContainerUtil.moveToDeletedContainerDir(container1Data, 
hddsVolume);
+    assertContainerInSchema3DB(container1Data, container1Block);
+    KeyValueContainerUtil.moveToDeletedContainerDir(container2Data, 
hddsVolume);
+    assertContainerInSchema3DB(container2Data, container2Block);
+
+    // Both containers should be present in the deleted directory.
+    File[] deleteDirFilesArray = deletedContainerDir.listFiles();
+    Assert.assertNotNull(deleteDirFilesArray);
+    Set<File> deleteDirFiles = Arrays.stream(deleteDirFilesArray)
+        .collect(Collectors.toSet());
+    Assert.assertEquals(2, deleteDirFiles.size());
+
+    File container1Dir = new File(container1Data.getContainerPath());
+    Assert.assertTrue(deleteDirFiles.contains(container1Dir));
+    File container2Dir = new File(container2Data.getContainerPath());
+    Assert.assertTrue(deleteDirFiles.contains(container2Dir));
+
+    // Delete container1 from the disk. Container2 should remain in the
+    // deleted containers directory.
     container1.delete();
+    assertContainerNotInSchema3DB(container1Data, container1Block);
+    assertContainerInSchema3DB(container2Data, container2Block);
 
-    Assert.assertTrue(KeyValueContainerUtil.ContainerDeleteDirectory
-        .getDeleteLeftovers(hddsVolume).hasNext());
-
-    ListIterator<File> iterator = KeyValueContainerUtil
-        .ContainerDeleteDirectory.getDeleteLeftovers(hddsVolume);
-
-    File metadata2Dir = container2.getContainerFile().getParentFile();
-    File container2Dir = metadata2Dir.getParentFile();
-
-    Assert.assertTrue(iterator.hasNext());
-    Assert.assertEquals(container2Dir, iterator.next());
+    // Check the delete directory again. Only container 2 should remain.
+    deleteDirFilesArray = deletedContainerDir.listFiles();
+    Assert.assertNotNull(deleteDirFilesArray);
+    Assert.assertEquals(1, deleteDirFilesArray.length);
+    Assert.assertEquals(deleteDirFilesArray[0], container2Dir);
 
+    // Delete container2 from the disk.
     container2.delete();
+    assertContainerNotInSchema3DB(container1Data, container1Block);
+    assertContainerNotInSchema3DB(container2Data, container2Block);
 
     // Remove containers from containerSet
     containerSet.removeContainer(testContainerID1);
@@ -478,9 +505,10 @@ public class TestContainerPersistence {
     Assert.assertFalse(containerSet.getContainerMapCopy()
         .containsKey(testContainerID2));
 
-    // 'tmp/delete_container_service' is empty
-    Assert.assertFalse(KeyValueContainerUtil.ContainerDeleteDirectory
-        .getDeleteLeftovers(hddsVolume).hasNext());
+    // Deleted containers directory should now be empty.
+    deleteDirFilesArray = deletedContainerDir.listFiles();
+    Assert.assertNotNull(deleteDirFilesArray);
+    Assert.assertEquals(0, deleteDirFilesArray.length);
   }
 
   @Test
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/utils/TestStorageVolumeUtil.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/utils/TestStorageVolumeUtil.java
index b7f1397ab3..d8bab45662 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/utils/TestStorageVolumeUtil.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/utils/TestStorageVolumeUtil.java
@@ -27,6 +27,8 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -46,6 +48,9 @@ public class TestStorageVolumeUtil {
   @Rule
   public final TemporaryFolder folder = new TemporaryFolder();
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestStorageVolumeUtil.class);
+
   private static final String DATANODE_UUID = UUID.randomUUID().toString();
   private static final String CLUSTER_ID = UUID.randomUUID().toString();
   private static final OzoneConfiguration CONF = new OzoneConfiguration();
@@ -90,7 +95,7 @@ public class TestStorageVolumeUtil {
 
     // checkVolume for the 2nd time: rootFiles.length == 2
     res = StorageVolumeUtil.checkVolume(spyHddsVolume, CLUSTER_ID,
-        CLUSTER_ID, CONF, null, dbVolumeSet);
+        CLUSTER_ID, CONF, LOG, dbVolumeSet);
     assertTrue(res);
 
     // should only call createDbStore once, so no dup db instance
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolume.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolume.java
index 93373e4188..53da489a3a 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolume.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolume.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common.volume;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
@@ -103,15 +104,62 @@ public class TestHddsVolume {
     assertEquals(CLUSTER_ID, volume.getClusterID());
     assertEquals(HddsVolume.VolumeState.NORMAL, volume.getStorageState());
 
-    // Create a working directory
-    // tmp directory should be initialized.
+    // Shutdown the volume.
+    volume.shutdown();
+  }
+
+  @Test
+  public void testCreateVolumeTmpDirs() throws Exception {
+    // Set up volume.
+    HddsVolume volume = volumeBuilder.build();
+    volume.format(CLUSTER_ID);
     volume.createWorkingDir(CLUSTER_ID, null);
+    volume.createTmpDirs(CLUSTER_ID);
 
-    File tmpDir = new File(volume.getTmpDirPath().toString());
-    assertTrue(tmpDir.exists());
+    // All temp directories should have been created.
+    assertTrue(volume.getTmpDir().exists());
+    assertTrue(volume.getDeletedContainerDir().exists());
+
+    volume.shutdown();
+    // tmp directories should still exist after shutdown. This is not
+    // checking their contents.
+    assertTrue(volume.getTmpDir().exists());
+    assertTrue(volume.getDeletedContainerDir().exists());
+  }
+
+  @Test
+  public void testClearVolumeTmpDirs() throws Exception {
+    // Set up volume.
+    HddsVolume volume = volumeBuilder.build();
+    volume.format(CLUSTER_ID);
+
+    File tmpDir = volume.getHddsRootDir().toPath()
+        .resolve(Paths.get(CLUSTER_ID, StorageVolume.TMP_DIR_NAME)).toFile();
+    File tmpDeleteDir = new File(tmpDir,
+        HddsVolume.TMP_CONTAINER_DELETE_DIR_NAME);
+    // Simulate a container that failed to delete fully from the deleted
+    // containers directory.
+    File leftoverContainer = new File(tmpDeleteDir, "1");
+    assertTrue(leftoverContainer.mkdirs());
+
+    // Check that tmp dirs are created with expected names.
+    volume.createWorkingDir(CLUSTER_ID, null);
+    volume.createTmpDirs(CLUSTER_ID);
+    assertEquals(tmpDir, volume.getTmpDir());
+    assertEquals(tmpDeleteDir, volume.getDeletedContainerDir());
+
+    // Cleanup should have removed the partial container without removing the
+    // delete directory itself.
+    assertFalse(leftoverContainer.exists());
+    assertTrue(tmpDeleteDir.exists());
+
+    // Re-create the partial container.
+    assertTrue(leftoverContainer.mkdir());
 
-    // Shutdown the volume.
     volume.shutdown();
+    // It should be cleared again on shutdown.
+    assertFalse(leftoverContainer.exists());
+    assertTrue(tmpDeleteDir.exists());
   }
 
   @Test
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index f8454da881..a841735cd7 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -50,25 +50,25 @@ import 
org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
 import org.apache.ozone.test.GenericTestUtils;
 
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
 import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
 
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestRule;
 import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.mockito.ArgumentMatchers.any;
 import org.mockito.Mockito;
 import static org.mockito.Mockito.doCallRealMethod;
@@ -369,6 +369,7 @@ public class TestKeyValueHandler {
           .build();
       hddsVolume.format(clusterId);
       hddsVolume.createWorkingDir(clusterId, null);
+      hddsVolume.createTmpDirs(clusterId);
 
       Mockito.when(volumeSet.getVolumesList())
           .thenReturn(Collections.singletonList(hddsVolume));
@@ -376,7 +377,7 @@ public class TestKeyValueHandler {
       List<HddsVolume> hddsVolumeList = StorageVolumeUtil
           .getHddsVolumesList(volumeSet.getVolumesList());
 
-      assertTrue(hddsVolumeList.size() == 1);
+      assertEquals(1, hddsVolumeList.size());
 
       final ContainerMetrics metrics = ContainerMetrics.create(conf);
 
@@ -407,8 +408,10 @@ public class TestKeyValueHandler {
       Assert.assertEquals(2, icrReceived.get());
       Assert.assertNull(containerSet.getContainer(containerID));
 
-      assertFalse(KeyValueContainerUtil.ContainerDeleteDirectory
-          .getDeleteLeftovers(hddsVolume).hasNext());
+      File[] deletedContainers =
+          hddsVolume.getDeletedContainerDir().listFiles();
+      assertNotNull(deletedContainers);
+      Assertions.assertEquals(0, deletedContainers.length);
     } finally {
       FileUtils.deleteDirectory(new File(testDir));
     }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index b8eb7b0cb1..62f4f116e2 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -62,7 +62,6 @@ import 
org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
-import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
 import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaThreeImpl;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
@@ -171,7 +170,10 @@ public class TestEndPoint {
    * that they have been deleted during tmp dir cleanup.
    */
   @Test
-  public void testTmpDirCleanup() throws Exception {
+  public void testDeletedContainersClearedOnStartup() throws Exception {
+    // TODO HDDS-8770. If cleanup of RocksDB is no longer required on tmp dir
+    //  cleanup, this test can be removed.
+
     OzoneConfiguration conf = SCMTestUtils.getConf();
     conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
         true);
@@ -201,7 +203,7 @@ public class TestEndPoint {
       // Write some data before calling versionTask.call()
       // Create a container and move it under the tmp delete dir.
       KeyValueContainer container = ContainerTestUtils.
-          setUpTestContainerUnderTmpDir(hddsVolume, clusterId,
+          addContainerToDeletedDir(hddsVolume, clusterId,
               conf, OzoneConsts.SCHEMA_V3);
       File containerDBFile = container.getContainerDBFile();
 
@@ -237,8 +239,10 @@ public class TestEndPoint {
             newState);
 
         // assert that tmp dir is empty
-        Assertions.assertFalse(KeyValueContainerUtil.ContainerDeleteDirectory
-            .getDeleteLeftovers(hddsVolume).hasNext());
+        File[] leftoverContainers =
+            hddsVolume.getDeletedContainerDir().listFiles();
+        Assertions.assertNotNull(leftoverContainers);
+        Assertions.assertEquals(0, leftoverContainers.length);
         // All DB keys have been deleted, MetadataTable should be empty
         Assertions.assertEquals(0, metadataTable.getEstimatedKeyCount());
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to