This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c8804ac9b HDDS-8115. Do not use block count to determine if 
containers are empty. (#4655)
3c8804ac9b is described below

commit 3c8804ac9b2d7bc543fc6cf317c6998713ebcb90
Author: ashishkumar50 <[email protected]>
AuthorDate: Fri Jun 16 21:18:20 2023 +0530

    HDDS-8115. Do not use block count to determine if containers are empty. 
(#4655)
---
 .../container/common/helpers/ContainerMetrics.java |  17 +-
 .../ozone/container/common/impl/ContainerData.java |  15 ++
 .../container/common/interfaces/Container.java     |   6 +-
 .../container/keyvalue/KeyValueContainer.java      |  19 +-
 .../ozone/container/keyvalue/KeyValueHandler.java  |  66 +----
 .../keyvalue/helpers/KeyValueContainerUtil.java    |  52 +++-
 .../background/BlockDeletingService.java           |  12 +
 .../container/keyvalue/TestKeyValueContainer.java  | 106 ++++++++
 .../proto/ScmServerDatanodeHeartbeatProtocol.proto |   1 +
 .../container/AbstractContainerReportHandler.java  |   2 +
 .../hdds/scm/container/ContainerReplica.java       |  23 +-
 .../replication/LegacyReplicationManager.java      |   8 +-
 .../replication/health/EmptyContainerHandler.java  |  11 +-
 .../org/apache/hadoop/hdds/scm/HddsTestUtils.java  |   3 +-
 .../container/replication/ReplicationTestUtil.java |   1 +
 .../replication/TestLegacyReplicationManager.java  |   2 +-
 .../commandhandler/TestBlockDeletion.java          | 276 ++++++++++++++++++++-
 .../commandhandler/TestDeleteContainerHandler.java | 237 +++++++++++-------
 18 files changed, 663 insertions(+), 194 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
index e717da65ff..fc19375189 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
@@ -49,10 +49,9 @@ public class ContainerMetrics {
   public static final String STORAGE_CONTAINER_METRICS =
       "StorageContainerMetrics";
   @Metric private MutableCounterLong numOps;
-  @Metric private MutableCounterLong containerDeleteFailedNonEmptyDir;
+  @Metric private MutableCounterLong containerDeleteFailedNonEmpty;
   @Metric private MutableCounterLong containerDeleteFailedBlockCountNotZero;
   @Metric private MutableCounterLong containerForceDelete;
-  @Metric private MutableCounterLong containerDeleteFailedNonEmptyBlockDB;
 
   private MutableCounterLong[] numOpsArray;
   private MutableCounterLong[] opsBytesArray;
@@ -135,15 +134,15 @@ public class ContainerMetrics {
     containerDeleteFailedBlockCountNotZero.incr();
   }
   public void incContainerDeleteFailedNonEmpty() {
-    containerDeleteFailedNonEmptyDir.incr();
+    containerDeleteFailedNonEmpty.incr();
   }
 
   public void incContainersForceDelete() {
     containerForceDelete.incr();
   }
 
-  public long getContainerDeleteFailedNonEmptyDir() {
-    return containerDeleteFailedNonEmptyDir.value();
+  public long getContainerDeleteFailedNonEmpty() {
+    return containerDeleteFailedNonEmpty.value();
   }
 
   public long getContainerDeleteFailedBlockCountNotZero() {
@@ -153,12 +152,4 @@ public class ContainerMetrics {
   public long getContainerForceDelete() {
     return containerForceDelete.value();
   }
-
-  public void incContainerDeleteFailedNonEmptyBlocksDB() {
-    containerDeleteFailedNonEmptyBlockDB.incr();
-  }
-
-  public long getContainerDeleteFailedNonEmptyBlockDB() {
-    return containerDeleteFailedNonEmptyBlockDB.value();
-  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
index dbbd457447..54fca5a61e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
@@ -101,6 +101,8 @@ public abstract class ContainerData {
 
   private String checksum;
 
+  private boolean isEmpty;
+
   /** Timestamp of last data scan (milliseconds since Unix Epoch).
    * {@code null} if not yet scanned (or timestamp not recorded,
    * eg. in prior versions). */
@@ -154,6 +156,7 @@ public abstract class ContainerData {
     this.maxSize = size;
     this.originPipelineId = originPipelineId;
     this.originNodeId = originNodeId;
+    this.isEmpty = false;
     setChecksumTo0ByteArray();
   }
 
@@ -537,6 +540,18 @@ public abstract class ContainerData {
     return this.blockCount.get();
   }
 
+  public boolean isEmpty() {
+    return isEmpty;
+  }
+
+  /**
+   * Indicates that this container has no more data, and is eligible for
+   * deletion. Once this flag is set on a container, it cannot leave this 
state.
+   */
+  public void markAsEmpty() {
+    this.isEmpty = true;
+  }
+
   /**
    * Set's number of blocks in the container.
    * @param count
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
index ef7c4f7420..9dfd94dcb4 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
@@ -58,11 +58,11 @@ public interface Container<CONTAINERDATA extends 
ContainerData> extends RwLock {
   void delete() throws StorageContainerException;
 
   /**
-   * Returns true if container is empty.
-   * @return true of container is empty
+   * Returns true if container has some block.
+   * @return true if container has some block.
    * @throws IOException if was unable to check container status.
    */
-  boolean isEmpty() throws IOException;
+  boolean hasBlocks() throws IOException;
 
   /**
    * Update the container.
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 1900d28b29..776d63b704 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -52,6 +52,7 @@ import 
org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
 import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
 import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
@@ -108,6 +109,8 @@ public class KeyValueContainer implements 
Container<KeyValueContainerData> {
   // container are synchronous.
   private Set<Long> pendingPutBlockCache;
 
+  private final boolean bCheckChunksFilePath;
+
   public KeyValueContainer(KeyValueContainerData containerData,
       ConfigurationSource ozoneConfig) {
     Preconditions.checkNotNull(containerData,
@@ -123,6 +126,12 @@ public class KeyValueContainer implements 
Container<KeyValueContainerData> {
     } else {
       this.pendingPutBlockCache = Collections.emptySet();
     }
+    bCheckChunksFilePath =
+        ozoneConfig.getBoolean(
+            DatanodeConfiguration.
+                OZONE_DATANODE_CHECK_EMPTY_CONTAINER_ON_DISK_ON_DELETE,
+            DatanodeConfiguration.
+                
OZONE_DATANODE_CHECK_EMPTY_CONTAINER_ON_DISK_ON_DELETE_DEFAULT);
   }
 
   @Override
@@ -311,8 +320,11 @@ public class KeyValueContainer implements 
Container<KeyValueContainerData> {
   }
 
   @Override
-  public boolean isEmpty() throws IOException {
-    return KeyValueContainerUtil.noBlocksInContainer(containerData);
+  public boolean hasBlocks() throws IOException {
+    try (DBHandle db = BlockUtils.getDB(containerData, config)) {
+      return !KeyValueContainerUtil.noBlocksInContainer(db.getStore(),
+          containerData, bCheckChunksFilePath);
+    }
   }
 
   @Override
@@ -802,7 +814,8 @@ public class KeyValueContainer implements 
Container<KeyValueContainerData> {
         .setReplicaIndex(containerData.getReplicaIndex())
         .setDeleteTransactionId(containerData.getDeleteTransactionId())
         .setBlockCommitSequenceId(containerData.getBlockCommitSequenceId())
-        .setOriginNodeId(containerData.getOriginNodeId());
+        .setOriginNodeId(containerData.getOriginNodeId())
+        .setIsEmpty(containerData.isEmpty());
     return ciBuilder.build();
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 24fefc8d4c..18b415bceb 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -135,8 +135,6 @@ public class KeyValueHandler extends Handler {
   private final long maxContainerSize;
   private final Function<ByteBuffer, ByteString> byteBufferToByteString;
   private final boolean validateChunkChecksumData;
-  private final boolean checkIfNoBlockFiles;
-
   // A striped lock that is held during container creation.
   private final Striped<Lock> containerCreationLocks;
 
@@ -160,13 +158,6 @@ public class KeyValueHandler extends Handler {
       throw new RuntimeException(e);
     }
 
-    checkIfNoBlockFiles =
-        conf.getBoolean(
-            DatanodeConfiguration.
-                OZONE_DATANODE_CHECK_EMPTY_CONTAINER_ON_DISK_ON_DELETE,
-        DatanodeConfiguration.
-            OZONE_DATANODE_CHECK_EMPTY_CONTAINER_ON_DISK_ON_DELETE_DEFAULT);
-
     maxContainerSize = (long) config.getStorageSize(
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
@@ -1309,64 +1300,21 @@ public class KeyValueHandler extends Handler {
         // If the container is not empty, it should not be deleted unless the
         // container is being forcefully deleted (which happens when
         // container is unhealthy or over-replicated).
-        if (container.getContainerData().getBlockCount() != 0) {
-          metrics.incContainerDeleteFailedBlockCountNotZero();
+        if (container.hasBlocks()) {
+          metrics.incContainerDeleteFailedNonEmpty();
           LOG.error("Received container deletion command for container {} but" 
+
                   " the container is not empty with blockCount {}",
               container.getContainerData().getContainerID(),
               container.getContainerData().getBlockCount());
+          // blocks table for future debugging.
+          // List blocks
+          logBlocksIfNonZero(container);
+          // Log chunks
+          logBlocksFoundOnDisk(container);
           throw new StorageContainerException("Non-force deletion of " +
               "non-empty container is not allowed.",
               DELETE_ON_NON_EMPTY_CONTAINER);
         }
-
-        // This is a defensive check to make sure there is no data loss if
-        // 1. There are one or more blocks on the filesystem
-        // 2. There are one or more blocks in the block table
-        // This can lead to false positives as
-        // 1. Chunks written to disk that did not get recorded in RocksDB can
-        //    occur due to failures during write
-        // 2. Blocks that were deleted from blocks table but the deletion of
-        //    the underlying file could not be completed
-        // 3. Failures between files being deleted from disk but not being
-        //    cleaned up.
-        // 4. Bugs in the code.
-        // Blocks stored on disk represent data written by a client and should
-        // be treated with care at the expense of creating artifacts on disk
-        // that  might be unreferenced.
-        // https://issues.apache.org/jira/browse/HDDS-8138 will move the
-        // implementation to only depend on consistency of the chunks folder
-
-        // First check if any files are in the chunks folder. If there are
-        // to help with debugging also dump the blocks table data.
-        if (checkIfNoBlockFiles) {
-          if (!container.isEmpty()) {
-            metrics.incContainerDeleteFailedNonEmpty();
-            logBlocksFoundOnDisk(container);
-            logBlocksIfNonZero(container);
-            // List Blocks from Blocks Table
-            throw new StorageContainerException("Non-force deletion of " +
-                "non-empty container dir:" +
-                container.getContainerData().getContainerID() +
-                " is not allowed.",
-                DELETE_ON_NON_EMPTY_CONTAINER);
-          }
-
-          // The chunks folder is empty, not check if the blocks table has any
-          // blocks still referenced. This will avoid cleaning up the
-          // blocks table for future debugging.
-          // List rocks
-          if (logBlocksIfNonZero(container)) {
-            LOG.error("Non-empty blocks table for container {}",
-                container.getContainerData().getContainerID());
-            metrics.incContainerDeleteFailedNonEmptyBlocksDB();
-            throw new StorageContainerException("Non-force deletion of " +
-                "non-empty container block table:" +
-                container.getContainerData().getContainerID() +
-                " is not allowed.",
-                DELETE_ON_NON_EMPTY_CONTAINER);
-          }
-        }
       } else {
         metrics.incContainersForceDelete();
       }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
index 8853497e2c..6486143af5 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
@@ -40,6 +40,7 @@ import 
org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
 import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.hadoop.ozone.container.common.utils.ContainerInspectorUtil;
 import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
@@ -181,21 +182,37 @@ public final class KeyValueContainerUtil {
 
   /**
    * Returns if there are no blocks in the container.
+   * @param store DBStore
    * @param containerData Container to check
+   * @param bCheckChunksFilePath Whether to check chunksfilepath has any blocks
    * @return true if the directory containing blocks is empty
    * @throws IOException
    */
-  public static boolean noBlocksInContainer(KeyValueContainerData
-                                                containerData)
+  public static boolean noBlocksInContainer(DatanodeStore store,
+                                            KeyValueContainerData
+                                            containerData,
+                                            boolean bCheckChunksFilePath)
       throws IOException {
+    Preconditions.checkNotNull(store);
     Preconditions.checkNotNull(containerData);
-    File chunksPath = new File(containerData.getChunksPath());
-    Preconditions.checkArgument(chunksPath.isDirectory());
-
-    try (DirectoryStream<Path> dir
-             = Files.newDirectoryStream(chunksPath.toPath())) {
-      return !dir.iterator().hasNext();
+    if (containerData.isOpen()) {
+      return false;
+    }
+    try (BlockIterator<BlockData> blockIterator =
+             store.getBlockIterator(containerData.getContainerID())) {
+      if (blockIterator.hasNext()) {
+        return false;
+      }
     }
+    if (bCheckChunksFilePath) {
+      File chunksPath = new File(containerData.getChunksPath());
+      Preconditions.checkArgument(chunksPath.isDirectory());
+      try (DirectoryStream<Path> dir
+               = Files.newDirectoryStream(chunksPath.toPath())) {
+        return !dir.iterator().hasNext();
+      }
+    }
+    return true;
   }
 
   /**
@@ -230,9 +247,16 @@ public final class KeyValueContainerUtil {
     }
     kvContainerData.setDbFile(dbFile);
 
+    boolean bCheckChunksFilePath =
+        config.getBoolean(
+            DatanodeConfiguration.
+              OZONE_DATANODE_CHECK_EMPTY_CONTAINER_ON_DISK_ON_DELETE,
+            DatanodeConfiguration.
+              OZONE_DATANODE_CHECK_EMPTY_CONTAINER_ON_DISK_ON_DELETE_DEFAULT);
     if (kvContainerData.hasSchema(OzoneConsts.SCHEMA_V3)) {
       try (DBHandle db = BlockUtils.getDB(kvContainerData, config)) {
-        populateContainerMetadata(kvContainerData, db.getStore());
+        populateContainerMetadata(kvContainerData,
+            db.getStore(), bCheckChunksFilePath);
       }
       return;
     }
@@ -255,7 +279,7 @@ public final class KeyValueContainerUtil {
             "instance was retrieved from the cache. This should only happen " +
             "in tests");
       }
-      populateContainerMetadata(kvContainerData, store);
+      populateContainerMetadata(kvContainerData, store, bCheckChunksFilePath);
     } finally {
       if (cachedDB != null) {
         // If we get a cached instance, calling close simply decrements the
@@ -277,7 +301,8 @@ public final class KeyValueContainerUtil {
   }
 
   private static void populateContainerMetadata(
-      KeyValueContainerData kvContainerData, DatanodeStore store)
+      KeyValueContainerData kvContainerData, DatanodeStore store,
+      boolean bCheckChunksFilePath)
       throws IOException {
     boolean isBlockMetadataSet = false;
     Table<String, Long> metadataTable = store.getMetadataTable();
@@ -342,6 +367,11 @@ public final class KeyValueContainerUtil {
     if (!chunksDir.exists()) {
       Files.createDirectories(chunksDir.toPath());
     }
+
+    if (noBlocksInContainer(store, kvContainerData, bCheckChunksFilePath)) {
+      kvContainerData.markAsEmpty();
+    }
+
     // Run advanced container inspection/repair operations if specified on
     // startup. If this method is called but not as a part of startup,
     // The inspectors will be unloaded and this will be a no-op.
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
index deee5610b3..c7cf4667c6 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
@@ -396,6 +396,12 @@ public class BlockDeletingService extends 
BackgroundService {
           int deletedBlocksCount = succeedBlocks.size();
           containerData.updateAndCommitDBCounters(meta, batch,
               deletedBlocksCount, releasedBytes);
+          // Once DB update is persisted, check if there are any blocks
+          // remaining in the DB. This will determine whether the container
+          // can be deleted by SCM.
+          if (!container.hasBlocks()) {
+            containerData.markAsEmpty();
+          }
 
           // update count of pending deletion blocks, block count and used
           // bytes in in-memory container status.
@@ -530,6 +536,12 @@ public class BlockDeletingService extends 
BackgroundService {
           // batched together while committing to DB.
           containerData.updateAndCommitDBCounters(meta, batch,
               deletedBlocksCount, releasedBytes);
+          // Once DB update is persisted, check if there are any blocks
+          // remaining in the DB. This will determine whether the container
+          // can be deleted by SCM.
+          if (!container.hasBlocks()) {
+            containerData.markAsEmpty();
+          }
 
           // update count of pending deletion blocks, block count and used
           // bytes in in-memory container status and used space in volume.
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
index 723d4a786e..713936ed58 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
@@ -377,6 +377,23 @@ public class TestKeyValueContainer {
     populate(keyValueContainer, numberOfKeysToWrite);
   }
 
+  private void populateWithoutBlock(KeyValueContainer container,
+                                    long numberOfKeysToWrite)
+      throws IOException {
+    KeyValueContainerData cData = container.getContainerData();
+    try (DBHandle metadataStore = BlockUtils.getDB(cData, CONF)) {
+      // Just update metdata, and don't insert in block table
+      // As for test, we are doing manually so adding key count to DB.
+      metadataStore.getStore().getMetadataTable()
+          .put(cData.getBlockCountKey(), numberOfKeysToWrite);
+    }
+
+    Map<String, String> metadata = new HashMap<>();
+    metadata.put("key1", "value1");
+    container.update(metadata, true);
+  }
+
+
   /**
    * Set container state to CLOSED.
    */
@@ -769,4 +786,93 @@ public class TestKeyValueContainer {
       }
     }
   }
+
+  @Test
+  public void testIsEmptyContainerStateWhileImport() throws Exception {
+    long containerId = keyValueContainer.getContainerData().getContainerID();
+    createContainer();
+    long numberOfKeysToWrite = 1;
+    closeContainer();
+    populate(numberOfKeysToWrite);
+
+    //destination path
+    File folderToExport = folder.newFile("export.tar");
+    for (CopyContainerCompression compr : CopyContainerCompression.values()) {
+      TarContainerPacker packer = new TarContainerPacker(compr);
+
+      //export the container
+      try (FileOutputStream fos = new FileOutputStream(folderToExport)) {
+        keyValueContainer
+            .exportContainerData(fos, packer);
+      }
+
+      //delete the original one
+      keyValueContainer.delete();
+
+      //create a new one
+      KeyValueContainerData containerData =
+          new KeyValueContainerData(containerId,
+              keyValueContainerData.getLayoutVersion(),
+              keyValueContainerData.getMaxSize(), UUID.randomUUID().toString(),
+              datanodeId.toString());
+      containerData.setSchemaVersion(keyValueContainerData.getSchemaVersion());
+      KeyValueContainer container = new KeyValueContainer(containerData, CONF);
+
+      HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(
+          StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()), 1);
+
+      container.populatePathFields(scmId, containerVolume);
+      try (FileInputStream fis = new FileInputStream(folderToExport)) {
+        container.importContainerData(fis, packer);
+      }
+
+      // After import check whether isEmpty flag is false
+      Assert.assertFalse(container.getContainerData().isEmpty());
+    }
+  }
+
+  @Test
+  public void testIsEmptyContainerStateWhileImportWithoutBlock()
+      throws Exception {
+    long containerId = keyValueContainer.getContainerData().getContainerID();
+    createContainer();
+    long numberOfKeysToWrite = 1;
+    closeContainer();
+    populateWithoutBlock(keyValueContainer, numberOfKeysToWrite);
+
+    //destination path
+    File folderToExport = folder.newFile("export.tar");
+    for (CopyContainerCompression compr : CopyContainerCompression.values()) {
+      TarContainerPacker packer = new TarContainerPacker(compr);
+
+      //export the container
+      try (FileOutputStream fos = new FileOutputStream(folderToExport)) {
+        keyValueContainer
+            .exportContainerData(fos, packer);
+      }
+
+      //delete the original one
+      keyValueContainer.delete();
+      //create a new one
+      KeyValueContainerData containerData =
+          new KeyValueContainerData(containerId,
+              keyValueContainerData.getLayoutVersion(),
+              keyValueContainerData.getMaxSize(), UUID.randomUUID().toString(),
+              datanodeId.toString());
+      containerData.setSchemaVersion(keyValueContainerData.getSchemaVersion());
+      KeyValueContainer container = new KeyValueContainer(containerData, CONF);
+
+      HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(
+          StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()), 1);
+
+      container.populatePathFields(scmId, containerVolume);
+      try (FileInputStream fis = new FileInputStream(folderToExport)) {
+        container.importContainerData(fis, packer);
+      }
+
+      // After import check whether isEmpty flag is true
+      // since there are no blocks in rocksdb
+      Assert.assertTrue(container.getContainerData().isEmpty());
+    }
+  }
 }
diff --git 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index 665665ee38..de9e39789b 100644
--- 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++ 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -233,6 +233,7 @@ message ContainerReplicaProto {
   optional uint64 blockCommitSequenceId = 12;
   optional string originNodeId = 13;
   optional int32 replicaIndex = 14;
+  optional bool isEmpty = 15 [default = false];
 }
 
 message CommandStatusReportsProto {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
index dd1758b466..004e32a2f2 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
@@ -159,6 +159,7 @@ public class AbstractContainerReportHandler {
         getOtherReplicas(containerInfo.containerID(), newSource);
     long usedBytes = newReplica.getUsed();
     long keyCount = newReplica.getKeyCount();
+
     for (ContainerReplica r : otherReplicas) {
       usedBytes = calculateUsage(containerInfo, usedBytes, r.getBytesUsed());
       keyCount = calculateUsage(containerInfo, keyCount, r.getKeyCount());
@@ -378,6 +379,7 @@ public class AbstractContainerReportHandler {
         .setKeyCount(replicaProto.getKeyCount())
         .setReplicaIndex(replicaProto.getReplicaIndex())
         .setBytesUsed(replicaProto.getUsed())
+        .setEmpty(replicaProto.getIsEmpty())
         .build();
 
     if (replica.getState().equals(State.DELETED)) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
index ab67bff0c7..56da38ed57 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
@@ -42,8 +42,9 @@ public final class ContainerReplica implements 
Comparable<ContainerReplica> {
   private Long sequenceId;
   private final long keyCount;
   private final long bytesUsed;
+  private final boolean isEmpty;
 
-
+  @SuppressWarnings("parameternumber")
   private ContainerReplica(
       final ContainerID containerID,
       final ContainerReplicaProto.State state,
@@ -51,7 +52,8 @@ public final class ContainerReplica implements 
Comparable<ContainerReplica> {
       final DatanodeDetails datanode,
       final UUID originNodeId,
       long keyNum,
-      long dataSize) {
+      long dataSize,
+      boolean isEmpty) {
     this.containerID = containerID;
     this.state = state;
     this.datanodeDetails = datanode;
@@ -59,6 +61,7 @@ public final class ContainerReplica implements 
Comparable<ContainerReplica> {
     this.keyCount = keyNum;
     this.bytesUsed = dataSize;
     this.replicaIndex = replicaIndex;
+    this.isEmpty = isEmpty;
   }
 
   private void setSequenceId(Long seqId) {
@@ -119,6 +122,10 @@ public final class ContainerReplica implements 
Comparable<ContainerReplica> {
     return bytesUsed;
   }
 
+  public boolean isEmpty() {
+    return isEmpty;
+  }
+
   @Override
   public int hashCode() {
     return new HashCodeBuilder(61, 71)
@@ -172,7 +179,8 @@ public final class ContainerReplica implements 
Comparable<ContainerReplica> {
         .setKeyCount(keyCount)
         .setOriginNodeId(placeOfBirth)
         .setReplicaIndex(replicaIndex)
-        .setSequenceId(sequenceId);
+        .setSequenceId(sequenceId)
+        .setEmpty(isEmpty);
   }
 
   @Override
@@ -187,6 +195,7 @@ public final class ContainerReplica implements 
Comparable<ContainerReplica> {
         ", bytesUsed=" + bytesUsed + ((replicaIndex > 0) ?
         ",replicaIndex=" + replicaIndex :
         "") +
+        ", isEmpty=" + isEmpty +
         '}';
   }
 
@@ -203,6 +212,7 @@ public final class ContainerReplica implements 
Comparable<ContainerReplica> {
     private long bytesUsed;
     private long keyCount;
     private int replicaIndex;
+    private boolean isEmpty;
 
     /**
      * Set Container Id.
@@ -272,6 +282,11 @@ public final class ContainerReplica implements 
Comparable<ContainerReplica> {
       return this;
     }
 
+    public ContainerReplicaBuilder setEmpty(boolean empty) {
+      isEmpty = empty;
+      return this;
+    }
+
     /**
      * Constructs new ContainerReplicaBuilder.
      *
@@ -287,7 +302,7 @@ public final class ContainerReplica implements 
Comparable<ContainerReplica> {
       ContainerReplica replica = new ContainerReplica(
           containerID, state, replicaIndex, datanode,
           Optional.ofNullable(placeOfBirth).orElse(datanode.getUuid()),
-          keyCount, bytesUsed);
+          keyCount, bytesUsed, isEmpty);
       Optional.ofNullable(sequenceId).ifPresent(replica::setSequenceId);
       return replica;
     }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
index 8183416f90..fb982dd063 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
@@ -931,8 +931,9 @@ public class LegacyReplicationManager {
   private boolean isContainerEmpty(final ContainerInfo container,
       final Set<ContainerReplica> replicas) {
     return container.getState() == LifeCycleState.CLOSED &&
-        container.getNumberOfKeys() == 0 && replicas.stream().allMatch(
-            r -> r.getState() == State.CLOSED && r.getKeyCount() == 0);
+        !replicas.isEmpty() &&
+        replicas.stream().allMatch(
+            r -> r.getState() == State.CLOSED && r.isEmpty());
   }
 
   /**
@@ -1024,11 +1025,10 @@ public class LegacyReplicationManager {
       InvalidStateTransitionException, TimeoutException {
     Preconditions.assertTrue(container.getState() ==
         LifeCycleState.CLOSED);
-    Preconditions.assertTrue(container.getNumberOfKeys() == 0);
 
     replicas.stream().forEach(rp -> {
       Preconditions.assertTrue(rp.getState() == State.CLOSED);
-      Preconditions.assertTrue(rp.getKeyCount() == 0);
+      Preconditions.assertTrue(rp.isEmpty());
       sendDeleteCommand(container, rp.getDatanodeDetails(), false);
     });
     containerManager.updateContainerState(container.containerID(),
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/EmptyContainerHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/EmptyContainerHandler.java
index b11bd5872f..29945a950d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/EmptyContainerHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/EmptyContainerHandler.java
@@ -88,9 +88,10 @@ public class EmptyContainerHandler extends AbstractCheck {
   private boolean isContainerEmptyAndClosed(final ContainerInfo container,
       final Set<ContainerReplica> replicas) {
     return container.getState() == HddsProtos.LifeCycleState.CLOSED &&
-        container.getNumberOfKeys() == 0 && replicas.stream()
-        .allMatch(r -> r.getState() == ContainerReplicaProto.State.CLOSED &&
-            r.getKeyCount() == 0);
+        !replicas.isEmpty() &&
+        replicas.stream().allMatch(
+            r -> r.getState() == ContainerReplicaProto.State.CLOSED &&
+                r.isEmpty());
   }
 
   /**
@@ -103,13 +104,11 @@ public class EmptyContainerHandler extends AbstractCheck {
       final Set<ContainerReplica> replicas) {
     Preconditions.assertSame(HddsProtos.LifeCycleState.CLOSED,
         containerInfo.getState(), "container state");
-    Preconditions.assertSame(0L, containerInfo.getNumberOfKeys(),
-        "key count");
 
     for (ContainerReplica rp : replicas) {
       Preconditions.assertSame(ContainerReplicaProto.State.CLOSED,
           rp.getState(), "replica state");
-      Preconditions.assertSame(0, rp.getKeyCount(), "replica key count");
+      Preconditions.assertSame(true, rp.isEmpty(), "replica empty");
 
       try {
         replicationManager.sendDeleteCommand(containerInfo,
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
index a577427975..6021c33de1 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
@@ -742,7 +742,8 @@ public final class HddsTestUtils {
             .setDatanodeDetails(datanodeDetails)
             .setOriginNodeId(originNodeId).setSequenceId(sequenceId)
             .setBytesUsed(usedBytes)
-            .setKeyCount(keyCount);
+            .setKeyCount(keyCount)
+            .setEmpty(keyCount == 0);
   }
 
   public static ContainerReplica getReplicas(
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
index d0516dd505..1b9cc85e2f 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
@@ -160,6 +160,7 @@ public final class ReplicationTestUtil {
     builder.setDatanodeDetails(datanodeDetails);
     builder.setSequenceId(0);
     builder.setOriginNodeId(originNodeId);
+    builder.setEmpty(keyCount == 0);
     return builder.build();
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
index 0ccb121404..f493138e89 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
@@ -2657,7 +2657,7 @@ public class TestLegacyReplicationManager {
     Assertions.assertEquals(count, report.getStat(
         ReplicationManagerReport.HealthState.OVER_REPLICATED));
   }
-
+  
   private static class DatanodeCommandHandler implements
       EventHandler<CommandForDatanode> {
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index 4cceb4d67b..4fd4e898f2 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -19,6 +19,7 @@ package 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
 import java.util.stream.Stream;
 
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.commons.lang3.RandomStringUtils;
@@ -37,6 +38,10 @@ import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
 import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService;
 import org.apache.hadoop.hdds.scm.block.ScmBlockDeletingServiceMetrics;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
 import 
org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -49,6 +54,7 @@ import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.TestHelper;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
@@ -65,6 +71,7 @@ import 
org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.GenericTestUtils.LogCapturer;
 import org.apache.ozone.test.tag.Flaky;
+import org.junit.Assert;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -162,9 +169,13 @@ public class TestBlockDeletion {
         0,
         TimeUnit.MILLISECONDS);
     conf.setInt("hdds.datanode.block.delete.threads.max", 5);
+    ReplicationManager.ReplicationManagerConfiguration replicationConf = conf
+        .getObject(ReplicationManager.ReplicationManagerConfiguration.class);
+    replicationConf.setInterval(Duration.ofSeconds(300));
+    conf.setFromObject(replicationConf);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(3)
-        .setHbInterval(200)
+        .setHbInterval(50)
         .build();
     cluster.waitForClusterToBeReady();
     client = cluster.newClient();
@@ -371,9 +382,9 @@ public class TestBlockDeletion {
     writeClient.deleteKey(keyArgs);
     // Wait for blocks to be deleted and container reports to be processed
     GenericTestUtils.waitFor(() ->
-        scm.getContainerManager().getContainers().stream()
-            .allMatch(c -> c.getUsedBytes() == 0 && c.getNumberOfKeys() == 0),
-        500, 5000);
+            scm.getContainerManager().getContainers().stream()
+                .allMatch(c -> c.getUsedBytes() == 0 &&
+                    c.getNumberOfKeys() == 0), 500, 5000);
     Thread.sleep(5000);
     // Verify that pending block delete num are as expected with resent cmds
     cluster.getHddsDatanodes().forEach(dn -> {
@@ -435,6 +446,263 @@ public class TestBlockDeletion {
     LOG.info(metrics.toString());
   }
 
+  @Test
+  public void testContainerStateAfterDNRestart() throws Exception {
+    ReplicationManager replicationManager = scm.getReplicationManager();
+
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+
+    String value = RandomStringUtils.random(10 * 10);
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    String keyName = UUID.randomUUID().toString();
+    OzoneOutputStream out = bucket.createKey(keyName,
+        value.getBytes(UTF_8).length, ReplicationType.RATIS,
+        ReplicationFactor.THREE, new HashMap<>());
+    out.write(value.getBytes(UTF_8));
+    out.close();
+
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName).setKeyName(keyName).setDataSize(0)
+        .setReplicationConfig(
+            RatisReplicationConfig
+                .getInstance(HddsProtos.ReplicationFactor.THREE))
+        .build();
+    List<OmKeyLocationInfoGroup> omKeyLocationInfoGroupList =
+        om.lookupKey(keyArgs).getKeyLocationVersions();
+    Thread.sleep(5000);
+    List<ContainerInfo> containerInfos =
+        scm.getContainerManager().getContainers();
+    final int valueSize = value.getBytes(UTF_8).length;
+    final int keyCount = 1;
+    List<Long> containerIdList = new ArrayList<>();
+    containerInfos.stream().forEach(container -> {
+      Assertions.assertEquals(valueSize, container.getUsedBytes());
+      Assertions.assertEquals(keyCount, container.getNumberOfKeys());
+      containerIdList.add(container.getContainerID());
+    });
+
+    OzoneTestUtils.closeAllContainers(scm.getEventQueue(), scm);
+    // Wait for container to close
+    TestHelper.waitForContainerClose(cluster,
+        containerIdList.toArray(new Long[0]));
+    // make sure the containers are closed on the dn
+    omKeyLocationInfoGroupList.forEach((group) -> {
+      List<OmKeyLocationInfo> locationInfo = group.getLocationList();
+      locationInfo.forEach(
+          (info) -> cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
+              .getContainer().getContainerSet()
+              .getContainer(info.getContainerID()).getContainerData()
+              .setState(ContainerProtos.ContainerDataProto.State.CLOSED));
+    });
+
+    ContainerID containerId = ContainerID.valueOf(
+        containerInfos.get(0).getContainerID());
+    // Before restart container state is non-empty
+    Assertions.assertFalse(getContainerFromDN(
+        cluster.getHddsDatanodes().get(0), containerId.getId())
+        .getContainerData().isEmpty());
+    // Restart DataNode
+    cluster.restartHddsDatanode(0, true);
+
+    // After restart also container state remains non-empty.
+    Assertions.assertFalse(getContainerFromDN(
+        cluster.getHddsDatanodes().get(0), containerId.getId())
+        .getContainerData().isEmpty());
+
+    // Delete key
+    writeClient.deleteKey(keyArgs);
+    Thread.sleep(10000);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return scm.getContainerManager().getContainerReplicas(
+            containerId).stream().
+            allMatch(replica -> replica.isEmpty());
+      } catch (ContainerNotFoundException e) {
+        throw new RuntimeException(e);
+      }
+    },
+        100, 10 * 1000);
+
+    // Container state should be empty now as key got deleted
+    Assertions.assertTrue(getContainerFromDN(
+        cluster.getHddsDatanodes().get(0), containerId.getId())
+        .getContainerData().isEmpty());
+
+    // Restart DataNode
+    cluster.restartHddsDatanode(0, true);
+    // Container state should be empty even after restart
+    Assertions.assertTrue(getContainerFromDN(
+        cluster.getHddsDatanodes().get(0), containerId.getId())
+        .getContainerData().isEmpty());
+
+    GenericTestUtils.waitFor(() -> {
+      replicationManager.processAll();
+      ((EventQueue)scm.getEventQueue()).processAll(1000);
+      List<ContainerInfo> infos = scm.getContainerManager().getContainers();
+      try {
+        infos.stream().forEach(container -> {
+          Assertions.assertEquals(HddsProtos.LifeCycleState.DELETED,
+              container.getState());
+          try {
+            Assertions.assertEquals(HddsProtos.LifeCycleState.DELETED,
+                scm.getScmMetadataStore().getContainerTable()
+                    .get(container.containerID()).getState());
+          } catch (IOException e) {
+            Assertions.fail(
+                "Container from SCM DB should be marked as DELETED");
+          }
+        });
+      } catch (Throwable e) {
+        LOG.info(e.getMessage());
+        return false;
+      }
+      return true;
+    }, 500, 30000);
+    LOG.info(metrics.toString());
+  }
+
+  /**
+   * Return the container for the given containerID from the given DN.
+   */
+  private Container getContainerFromDN(HddsDatanodeService hddsDatanodeService,
+                                       long containerID) {
+    return hddsDatanodeService.getDatanodeStateMachine().getContainer()
+        .getContainerSet().getContainer(containerID);
+  }
+
+  @Test
+  public void testContainerDeleteWithInvalidKeyCount()
+      throws Exception {
+    ReplicationManager replicationManager = scm.getReplicationManager();
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+
+    String value = RandomStringUtils.random(1024 * 1024);
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    String keyName = UUID.randomUUID().toString();
+    OzoneOutputStream out = bucket.createKey(keyName,
+        value.getBytes(UTF_8).length, ReplicationType.RATIS,
+        ReplicationFactor.THREE, new HashMap<>());
+    out.write(value.getBytes(UTF_8));
+    out.close();
+
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName).setKeyName(keyName).setDataSize(0)
+        .setReplicationConfig(
+            RatisReplicationConfig
+                .getInstance(HddsProtos.ReplicationFactor.THREE))
+        .build();
+    List<OmKeyLocationInfoGroup> omKeyLocationInfoGroupList =
+        om.lookupKey(keyArgs).getKeyLocationVersions();
+    Thread.sleep(5000);
+    List<ContainerInfo> containerInfos =
+        scm.getContainerManager().getContainers();
+    final int valueSize = value.getBytes(UTF_8).length;
+    final int keyCount = 1;
+    List<Long> containerIdList = new ArrayList<>();
+    containerInfos.stream().forEach(container -> {
+      Assertions.assertEquals(valueSize, container.getUsedBytes());
+      Assertions.assertEquals(keyCount, container.getNumberOfKeys());
+      containerIdList.add(container.getContainerID());
+    });
+
+    OzoneTestUtils.closeAllContainers(scm.getEventQueue(), scm);
+    // Wait for container to close
+    TestHelper.waitForContainerClose(cluster,
+        containerIdList.toArray(new Long[0]));
+    // make sure the containers are closed on the dn
+    omKeyLocationInfoGroupList.forEach((group) -> {
+      List<OmKeyLocationInfo> locationInfo = group.getLocationList();
+      locationInfo.forEach(
+          (info) -> cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
+              .getContainer().getContainerSet()
+              .getContainer(info.getContainerID()).getContainerData()
+              .setState(ContainerProtos.ContainerDataProto.State.CLOSED));
+    });
+
+    ContainerStateManager containerStateManager = scm.getContainerManager()
+        .getContainerStateManager();
+    ContainerID containerId = ContainerID.valueOf(
+        containerInfos.get(0).getContainerID());
+    // Get all the replicas state from SCM
+    Set<ContainerReplica> replicas
+        = scm.getContainerManager().getContainerReplicas(containerId);
+
+    // Ensure for all replica isEmpty are false in SCM
+    Assert.assertTrue(scm.getContainerManager().getContainerReplicas(
+            containerId).stream().
+        allMatch(replica -> !replica.isEmpty()));
+
+    // Delete key
+    writeClient.deleteKey(keyArgs);
+    Thread.sleep(5000);
+
+    // Ensure isEmpty are true for all replica after delete key
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return scm.getContainerManager().getContainerReplicas(
+            containerId).stream()
+            .allMatch(replica -> replica.isEmpty());
+      } catch (ContainerNotFoundException e) {
+        throw new RuntimeException(e);
+      }
+    },
+        500, 5 * 2000);
+
+    // Update container replica by making invalid keyCount in one replica
+    ContainerReplica replicaOne = ContainerReplica.newBuilder()
+        .setContainerID(containerId)
+        .setKeyCount(10)
+        .setContainerState(StorageContainerDatanodeProtocolProtos
+            .ContainerReplicaProto.State.CLOSED)
+        .setDatanodeDetails(replicas.iterator().next().getDatanodeDetails())
+        .setEmpty(true)
+        .build();
+    // Update replica
+    containerStateManager.updateContainerReplica(containerId, replicaOne);
+
+    // Check replica updated with wrong keyCount
+    scm.getContainerManager().getContainerReplicas(
+            ContainerID.valueOf(containerInfos.get(0).getContainerID()))
+        .stream().anyMatch(replica -> replica.getKeyCount() == 10);
+
+    // Process delete container in SCM, ensure containers gets deleted,
+    // even though keyCount is invalid in one of the replica
+    GenericTestUtils.waitFor(() -> {
+      replicationManager.processAll();
+      ((EventQueue)scm.getEventQueue()).processAll(1000);
+      List<ContainerInfo> infos = scm.getContainerManager().getContainers();
+      try {
+        infos.stream().forEach(container -> {
+          Assertions.assertEquals(HddsProtos.LifeCycleState.DELETED,
+              container.getState());
+          try {
+            Assertions.assertEquals(HddsProtos.LifeCycleState.DELETED,
+                scm.getScmMetadataStore().getContainerTable()
+                    .get(container.containerID()).getState());
+          } catch (IOException e) {
+            Assertions.fail(
+                "Container from SCM DB should be marked as DELETED");
+          }
+        });
+      } catch (Throwable e) {
+        LOG.info(e.getMessage());
+        return false;
+      }
+      return true;
+    }, 500, 30000);
+  }
+
   private void verifyTransactionsCommitted() throws IOException {
     scm.getScmBlockManager().getDeletedBlockLog();
     for (long txnID = 1; txnID <= maxTransactionId; txnID++) {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
index e812510c22..f7bb6be3b1 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
@@ -19,13 +19,13 @@ package 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.ScmConfig;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneTestUtils;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
@@ -44,6 +45,7 @@ import 
org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
@@ -60,9 +62,11 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Rule;
 import org.junit.rules.Timeout;
@@ -72,6 +76,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
 
 /**
  * Tests DeleteContainerCommand Handler.
@@ -99,6 +104,17 @@ public class TestDeleteContainerHandler {
     conf.setStorageSize(OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN,
         0, StorageUnit.MB);
     conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
+
+    conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
+        TimeUnit.MILLISECONDS);
+    DatanodeConfiguration datanodeConfiguration = conf.getObject(
+        DatanodeConfiguration.class);
+    datanodeConfiguration.setBlockDeletionInterval(Duration.ofMillis(100));
+    conf.setFromObject(datanodeConfiguration);
+    ScmConfig scmConfig = conf.getObject(ScmConfig.class);
+    scmConfig.setBlockDeletionInterval(Duration.ofMillis(100));
+    conf.setFromObject(scmConfig);
+
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(1).build();
     cluster.waitForClusterToBeReady();
@@ -139,12 +155,6 @@ public class TestDeleteContainerHandler {
     // get containerID of the key
     ContainerID containerId = getContainerID(keyName);
 
-    ContainerInfo container = cluster.getStorageContainerManager()
-        .getContainerManager().getContainer(containerId);
-
-    Pipeline pipeline = cluster.getStorageContainerManager()
-        .getPipelineManager().getPipeline(container.getPipelineID());
-
     // We need to close the container because delete container only happens
     // on closed containers when force flag is set to false.
 
@@ -159,11 +169,31 @@ public class TestDeleteContainerHandler {
     NodeManager nodeManager =
         cluster.getStorageContainerManager().getScmNodeManager();
     //send the order to close the container
-    SCMCommand<?> command = new CloseContainerCommand(
-        containerId.getId(), pipeline.getId());
-    command.setTerm(
-        
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
-    nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command);
+    OzoneTestUtils.closeAllContainers(cluster.getStorageContainerManager()
+            .getEventQueue(), cluster.getStorageContainerManager());
+
+    ContainerMetrics metrics =
+        hddsDatanodeService
+            .getDatanodeStateMachine().getContainer().getMetrics();
+    long beforeDeleteFailedCount = metrics.getContainerDeleteFailedNonEmpty();
+    GenericTestUtils.waitFor(() ->
+            isContainerClosed(hddsDatanodeService, containerId.getId()),
+        500, 5 * 1000);
+
+    //double check if it's really closed (waitFor also throws an exception)
+    Assert.assertTrue(isContainerClosed(hddsDatanodeService,
+        containerId.getId()));
+
+    // Delete key, which will make isEmpty flag to true in containerData
+    objectStore.getVolume(volumeName)
+        .getBucket(bucketName).deleteKey(keyName);
+
+    // Ensure isEmpty flag is true when key is deleted and container is empty
+    GenericTestUtils.waitFor(() -> getContainerfromDN(
+        hddsDatanodeService, containerId.getId())
+        .getContainerData().isEmpty(),
+        500,
+        5 * 2000);
 
     Container containerInternalObj =
         hddsDatanodeService.
@@ -177,16 +207,6 @@ public class TestDeleteContainerHandler {
         new File(containerInternalObj.
             getContainerData().getChunksPath() + "/1.block");
     lingeringBlock.createNewFile();
-    ContainerMetrics metrics =
-        hddsDatanodeService
-            .getDatanodeStateMachine().getContainer().getMetrics();
-    GenericTestUtils.waitFor(() ->
-            isContainerClosed(hddsDatanodeService, containerId.getId()),
-        500, 5 * 1000);
-
-    //double check if it's really closed (waitFor also throws an exception)
-    Assert.assertTrue(isContainerClosed(hddsDatanodeService,
-        containerId.getId()));
 
     // Check container exists before sending delete container command
     Assert.assertFalse(isContainerDeleted(hddsDatanodeService,
@@ -197,7 +217,8 @@ public class TestDeleteContainerHandler {
         .getContainerData().setBlockCount(0);
 
     // send delete container to the datanode
-    command = new DeleteContainerCommand(containerId.getId(), false);
+    SCMCommand<?> command = new DeleteContainerCommand(containerId.getId(),
+        false);
 
     // Send the delete command. It should fail as even though block count
     // is zero there is a lingering block on disk.
@@ -216,8 +237,8 @@ public class TestDeleteContainerHandler {
         5 * 2000);
     Assert.assertTrue(!isContainerDeleted(hddsDatanodeService,
         containerId.getId()));
-    Assert.assertEquals(1,
-        metrics.getContainerDeleteFailedNonEmptyDir());
+    Assert.assertTrue(beforeDeleteFailedCount <
+        metrics.getContainerDeleteFailedNonEmpty());
     // Send the delete command. It should pass with force flag.
     // Deleting a non-empty container should pass on the DN when the force flag
     // is true
@@ -301,26 +322,13 @@ public class TestDeleteContainerHandler {
     Assert.assertFalse(isContainerDeleted(hddsDatanodeService,
         containerId.getId()));
 
-    // Set container blockCount to 0 to mock that it is empty as per RocksDB
-    getContainerfromDN(hddsDatanodeService, containerId.getId())
-        .getContainerData().setBlockCount(0);
-    // Write entries to the block Table.
-    try (DBHandle dbHandle
-             = BlockUtils.getDB(
-                 (KeyValueContainerData)getContainerfromDN(hddsDatanodeService,
-                     containerId.getId()).getContainerData(),
-        conf)) {
-      BlockData blockData = new BlockData(new BlockID(1, 1));
-      dbHandle.getStore().getBlockDataTable().put("block1", blockData);
-    }
-
-    long containerDeleteFailedNonEmptyBefore =
-        metrics.getContainerDeleteFailedNonEmptyDir();
+    long containerDeleteFailedNonEmptyBlockDB =
+        metrics.getContainerDeleteFailedNonEmpty();
     // send delete container to the datanode
     command = new DeleteContainerCommand(containerId.getId(), false);
 
-    // Send the delete command. It should fail as even though block count
-    // is zero there is a lingering block on disk.
+    // Send the delete command. It should fail as even though isEmpty
+    // flag is true, there is a lingering block on disk.
     command.setTerm(
         
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
     nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command);
@@ -332,15 +340,15 @@ public class TestDeleteContainerHandler {
             LoggerFactory.getLogger(KeyValueHandler.class));
     GenericTestUtils.waitFor(() ->
             logCapturer.getOutput().
-                contains("Files still part of the container on delete"),
+                contains("the container is not empty with blockCount"),
         500,
         5 * 2000);
     Assert.assertTrue(!isContainerDeleted(hddsDatanodeService,
         containerId.getId()));
-    Assert.assertTrue(containerDeleteFailedNonEmptyBefore <
-        metrics.getContainerDeleteFailedNonEmptyDir());
+    Assert.assertTrue(containerDeleteFailedNonEmptyBlockDB <
+        metrics.getContainerDeleteFailedNonEmpty());
 
-    // Now empty the container Dir and try with a non empty block table
+    // Now empty the container Dir and try with a non-empty block table
     Container containerToDelete = getContainerfromDN(
         hddsDatanodeService, containerId.getId());
     File chunkDir = new File(containerToDelete.
@@ -351,25 +359,16 @@ public class TestDeleteContainerHandler {
         FileUtils.delete(file);
       }
     }
+
     command = new DeleteContainerCommand(containerId.getId(), false);
 
-    // Send the delete command. It should fail as even though block count
-    // is zero there is a lingering block on disk.
+    // Send the delete command.It should fail as still block table is non-empty
     command.setTerm(
         
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
     nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command);
-
-
-    // Check the log for the error message when deleting non-empty containers
-    GenericTestUtils.waitFor(() ->
-            logCapturer.getOutput().
-                contains("Non-empty blocks table for container"),
-        500,
-        5 * 2000);
+    Thread.sleep(5000);
     Assert.assertTrue(!isContainerDeleted(hddsDatanodeService,
         containerId.getId()));
-    Assert.assertEquals(1,
-        metrics.getContainerDeleteFailedNonEmptyBlockDB());
     // Send the delete command. It should pass with force flag.
     long beforeForceCount = metrics.getContainerForceDelete();
     command = new DeleteContainerCommand(containerId.getId(), true);
@@ -387,6 +386,84 @@ public class TestDeleteContainerHandler {
         metrics.getContainerForceDelete());
   }
 
+  @Test(timeout = 60000)
+  public void testContainerDeleteWithInvalidBlockCount()
+      throws Exception {
+    String keyName = UUID.randomUUID().toString();
+    // create key
+    createKey(keyName);
+    // get containerID of the key
+    ContainerID containerId = getContainerID(keyName);
+    ContainerInfo container = cluster.getStorageContainerManager()
+        .getContainerManager().getContainer(containerId);
+    Pipeline pipeline = cluster.getStorageContainerManager()
+        .getPipelineManager().getPipeline(container.getPipelineID());
+
+    // We need to close the container because delete container only happens
+    // on closed containers when force flag is set to false.
+    HddsDatanodeService hddsDatanodeService =
+        cluster.getHddsDatanodes().get(0);
+
+    Assert.assertFalse(isContainerClosed(hddsDatanodeService,
+        containerId.getId()));
+
+    DatanodeDetails datanodeDetails = hddsDatanodeService.getDatanodeDetails();
+    NodeManager nodeManager =
+        cluster.getStorageContainerManager().getScmNodeManager();
+    //send the order to close the container
+    SCMCommand<?> command = new CloseContainerCommand(
+        containerId.getId(), pipeline.getId());
+    command.setTerm(
+        
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
+    nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command);
+
+    GenericTestUtils.waitFor(() ->
+            isContainerClosed(hddsDatanodeService, containerId.getId()),
+        500, 5 * 1000);
+
+    //double check if it's really closed (waitFor also throws an exception)
+    Assert.assertTrue(isContainerClosed(hddsDatanodeService,
+        containerId.getId()));
+
+    // Check container exists before sending delete container command
+    Assert.assertFalse(isContainerDeleted(hddsDatanodeService,
+        containerId.getId()));
+
+    // Clear block table
+    clearBlocksTable(getContainerfromDN(hddsDatanodeService,
+        containerId.getId()));
+
+
+    // Now empty the container Dir
+    Container containerToDelete = getContainerfromDN(
+        hddsDatanodeService, containerId.getId());
+    File chunkDir = new File(containerToDelete.
+        getContainerData().getChunksPath());
+    File[] files = chunkDir.listFiles();
+    if (files != null) {
+      for (File file : files) {
+        FileUtils.delete(file);
+      }
+    }
+
+    // send delete container to the datanode, blockCount is still 1(Invalid)
+    command = new DeleteContainerCommand(containerId.getId(), false);
+
+    // Send the delete command. It should succeed as even though blockCount
+    // is non-zero(Invalid).
+    command.setTerm(
+        
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
+    nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command);
+
+    GenericTestUtils.waitFor(() ->
+            isContainerDeleted(hddsDatanodeService, containerId.getId()),
+        500, 5 * 1000);
+    Assert.assertTrue(isContainerDeleted(hddsDatanodeService,
+        containerId.getId()));
+
+  }
+
+
   private void clearBlocksTable(Container container) throws IOException {
     try (DBHandle dbHandle
              = BlockUtils.getDB(
@@ -426,12 +503,6 @@ public class TestDeleteContainerHandler {
     // get containerID of the key
     ContainerID containerId = getContainerID(keyName);
 
-    ContainerInfo container = cluster.getStorageContainerManager()
-        .getContainerManager().getContainer(containerId);
-
-    Pipeline pipeline = cluster.getStorageContainerManager()
-        .getPipelineManager().getPipeline(container.getPipelineID());
-
     // We need to close the container because delete container only happens
     // on closed containers when force flag is set to false.
 
@@ -446,12 +517,10 @@ public class TestDeleteContainerHandler {
     NodeManager nodeManager =
         cluster.getStorageContainerManager().getScmNodeManager();
 
+
     //send the order to close the container
-    SCMCommand<?> command = new CloseContainerCommand(
-        containerId.getId(), pipeline.getId());
-    command.setTerm(
-        
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
-    nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command);
+    OzoneTestUtils.closeAllContainers(cluster.getStorageContainerManager()
+        .getEventQueue(), cluster.getStorageContainerManager());
 
     GenericTestUtils.waitFor(() ->
             isContainerClosed(hddsDatanodeService, containerId.getId()),
@@ -466,7 +535,8 @@ public class TestDeleteContainerHandler {
         containerId.getId()));
 
     // send delete container to the datanode
-    command = new DeleteContainerCommand(containerId.getId(), false);
+    SCMCommand<?> command = new DeleteContainerCommand(containerId.getId(),
+        false);
     command.setTerm(
         
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
     nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command);
@@ -478,26 +548,23 @@ public class TestDeleteContainerHandler {
         GenericTestUtils.LogCapturer.captureLogs(
             LoggerFactory.getLogger(DeleteContainerCommandHandler.class));
     GenericTestUtils.waitFor(() -> logCapturer.getOutput().contains("Non" +
-        "-force deletion of non-empty container is not allowed"), 500,
+            "-force deletion of non-empty container is not allowed"), 500,
         5 * 1000);
     ContainerMetrics metrics =
         hddsDatanodeService
             .getDatanodeStateMachine().getContainer().getMetrics();
     Assert.assertEquals(1,
-        metrics.getContainerDeleteFailedBlockCountNotZero());
-    // Set container blockCount to 0 to mock that it is empty
-    Container containerToDelete = getContainerfromDN(
-        hddsDatanodeService, containerId.getId());
-    containerToDelete.getContainerData().setBlockCount(0);
-    File chunkDir = new File(containerToDelete.
-        getContainerData().getChunksPath());
-    File[] files = chunkDir.listFiles();
-    if (files != null) {
-      for (File file : files) {
-        FileUtils.delete(file);
-      }
-    }
-    clearBlocksTable(containerToDelete);
+        metrics.getContainerDeleteFailedNonEmpty());
+
+    // Delete key, which will make isEmpty flag to true in containerData
+    objectStore.getVolume(volumeName)
+        .getBucket(bucketName).deleteKey(keyName);
+
+    // Ensure isEmpty flag is true when key is deleted
+    GenericTestUtils.waitFor(() -> getContainerfromDN(
+            hddsDatanodeService, containerId.getId())
+            .getContainerData().isEmpty(),
+        500, 5 * 2000);
 
     // Send the delete command again. It should succeed this time.
     command.setTerm(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to