This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 79d3c873f5 HDDS-9426. Calculate Exclusive size for deep cleaned 
snapshot's deleted directories (#6099)
79d3c873f5 is described below

commit 79d3c873f5cb69825d042f533033f94caa06717d
Author: Aswin Shakil Balasubramanian <[email protected]>
AuthorDate: Sat Jan 27 07:19:22 2024 -0800

    HDDS-9426. Calculate Exclusive size for deep cleaned snapshot's deleted 
directories (#6099)
---
 .../common/src/main/resources/ozone-default.xml    |  19 +
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |  12 +
 .../hadoop/ozone/om/helpers/SnapshotInfo.java      |  44 +-
 .../ozone/om/helpers/TestOmSnapshotInfo.java       |  11 +-
 .../om/TestSnapshotDirectoryCleaningService.java   | 272 +++++++++++
 .../src/main/proto/OmClientProtocol.proto          |  23 +-
 .../org/apache/hadoop/ozone/om/KeyManager.java     |   7 +
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  30 ++
 .../request/snapshot/OMSnapshotPurgeRequest.java   |  21 +-
 .../snapshot/OMSnapshotSetPropertyRequest.java     |  35 +-
 .../om/service/AbstractKeyDeletingService.java     | 100 ++++
 .../ozone/om/service/KeyDeletingService.java       | 151 ++----
 .../service/SnapshotDirectoryCleaningService.java  | 515 +++++++++++++++++++++
 ...estOMSnapshotSetPropertyRequestAndResponse.java |   8 +-
 .../ozone/om/service/TestKeyDeletingService.java   |   9 +-
 15 files changed, 1097 insertions(+), 160 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 7debcd479e..6a29dc81ec 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3600,6 +3600,25 @@
     </description>
   </property>
 
+  <property>
+    <name>ozone.snapshot.directory.service.timeout</name>
+    <value>300s</value>
+    <tag>OZONE, PERFORMANCE, OM</tag>
+    <description>
+      Timeout value for SnapshotDirectoryCleaningService.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.snapshot.directory.service.interval</name>
+    <value>24h</value>
+    <tag>OZONE, PERFORMANCE, OM</tag>
+    <description>
+      The time interval between successive SnapshotDirectoryCleaningService
+      thread run.
+    </description>
+  </property>
+
   <property>
     <name>ozone.scm.event.ContainerReport.thread.pool.size</name>
     <value>10</value>
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 58f341b74a..5dd7579eb9 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -390,6 +390,18 @@ public final class OMConfigKeys {
   public static final String OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT
       = "60s";
 
+  /**
+   * Configuration properties for Snapshot Directory Service.
+   */
+  public static final String OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL =
+      "ozone.snapshot.directory.service.interval";
+  public static final String OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL_DEFAULT
+      = "24h";
+  public static final String OZONE_SNAPSHOT_DIRECTORY_SERVICE_TIMEOUT =
+      "ozone.snapshot.directory.service.timeout";
+  public static final String
+      OZONE_SNAPSHOT_DIRECTORY_SERVICE_TIMEOUT_DEFAULT = "300s";
+
   public static final String OZONE_PATH_DELETING_LIMIT_PER_TASK =
       "ozone.path.deleting.limit.per.task";
   // default is 6000 taking account of 32MB buffer size, and assuming
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
index 8ee9c6ee1f..56103ccb3a 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
@@ -123,6 +123,7 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
   private long referencedReplicatedSize;
   private long exclusiveSize;
   private long exclusiveReplicatedSize;
+  private boolean deepCleanedDeletedDir;
 
   /**
    * Private constructor, constructed via builder.
@@ -162,7 +163,8 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
                        long referencedSize,
                        long referencedReplicatedSize,
                        long exclusiveSize,
-                       long exclusiveReplicatedSize) {
+                       long exclusiveReplicatedSize,
+                       boolean deepCleanedDeletedDir) {
     this.snapshotId = snapshotId;
     this.name = name;
     this.volumeName = volumeName;
@@ -181,6 +183,7 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
     this.referencedReplicatedSize = referencedReplicatedSize;
     this.exclusiveSize = exclusiveSize;
     this.exclusiveReplicatedSize = exclusiveReplicatedSize;
+    this.deepCleanedDeletedDir = deepCleanedDeletedDir;
   }
 
   public void setName(String name) {
@@ -285,7 +288,7 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
   }
 
   public SnapshotInfo.Builder toBuilder() {
-    return new SnapshotInfo.Builder()
+    return new Builder()
         .setSnapshotId(snapshotId)
         .setName(name)
         .setVolumeName(volumeName)
@@ -302,7 +305,8 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
         .setReferencedSize(referencedSize)
         .setReferencedReplicatedSize(referencedReplicatedSize)
         .setExclusiveSize(exclusiveSize)
-        .setExclusiveReplicatedSize(exclusiveReplicatedSize);
+        .setExclusiveReplicatedSize(exclusiveReplicatedSize)
+        .setDeepCleanedDeletedDir(deepCleanedDeletedDir);
   }
 
   /**
@@ -327,6 +331,7 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
     private long referencedReplicatedSize;
     private long exclusiveSize;
     private long exclusiveReplicatedSize;
+    private boolean deepCleanedDeletedDir;
 
     public Builder() {
       // default values
@@ -423,6 +428,11 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
       return this;
     }
 
+    public Builder setDeepCleanedDeletedDir(boolean deepCleanedDeletedDir) {
+      this.deepCleanedDeletedDir = deepCleanedDeletedDir;
+      return this;
+    }
+
     public SnapshotInfo build() {
       Preconditions.checkNotNull(name);
       return new SnapshotInfo(
@@ -443,7 +453,8 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
           referencedSize,
           referencedReplicatedSize,
           exclusiveSize,
-          exclusiveReplicatedSize
+          exclusiveReplicatedSize,
+          deepCleanedDeletedDir
       );
     }
   }
@@ -465,7 +476,8 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
             .setReferencedSize(referencedSize)
             .setReferencedReplicatedSize(referencedReplicatedSize)
             .setExclusiveSize(exclusiveSize)
-            .setExclusiveReplicatedSize(exclusiveReplicatedSize);
+            .setExclusiveReplicatedSize(exclusiveReplicatedSize)
+            .setDeepCleanedDeletedDir(deepCleanedDeletedDir);
 
     if (pathPreviousSnapshotId != null) {
       sib.setPathPreviousSnapshotID(toProtobuf(pathPreviousSnapshotId));
@@ -538,6 +550,11 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
           snapshotInfoProto.getExclusiveReplicatedSize());
     }
 
+    if (snapshotInfoProto.hasDeepCleanedDeletedDir()) {
+      osib.setDeepCleanedDeletedDir(
+          snapshotInfoProto.getDeepCleanedDeletedDir());
+    }
+
     osib.setSnapshotPath(snapshotInfoProto.getSnapshotPath())
         .setCheckpointDir(snapshotInfoProto.getCheckpointDir())
         .setDbTxSequenceNumber(snapshotInfoProto.getDbTxSequenceNumber());
@@ -622,6 +639,14 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
     return exclusiveReplicatedSize;
   }
 
+  public boolean getDeepCleanedDeletedDir() {
+    return deepCleanedDeletedDir;
+  }
+
+  public void setDeepCleanedDeletedDir(boolean deepCleanedDeletedDir) {
+    this.deepCleanedDeletedDir = deepCleanedDeletedDir;
+  }
+
   /**
    * Generate default name of snapshot, (used if user doesn't provide one).
    */
@@ -655,7 +680,8 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
         .setSnapshotPath(volumeName + OM_KEY_PREFIX + bucketName)
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
-        .setDeepClean(true);
+        .setDeepClean(false)
+        .setDeepCleanedDeletedDir(false);
 
     if (snapshotId != null) {
       builder.setCheckpointDir(getCheckpointDirName(snapshotId));
@@ -688,7 +714,8 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
         referencedSize == that.referencedSize &&
         referencedReplicatedSize == that.referencedReplicatedSize &&
         exclusiveSize == that.exclusiveSize &&
-        exclusiveReplicatedSize == that.exclusiveReplicatedSize;
+        exclusiveReplicatedSize == that.exclusiveReplicatedSize &&
+        deepCleanedDeletedDir == that.deepCleanedDeletedDir;
   }
 
   @Override
@@ -699,7 +726,7 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
         globalPreviousSnapshotId, snapshotPath, checkpointDir,
         deepClean, sstFiltered,
         referencedSize, referencedReplicatedSize,
-        exclusiveSize, exclusiveReplicatedSize);
+        exclusiveSize, exclusiveReplicatedSize, deepCleanedDeletedDir);
   }
 
   /**
@@ -726,6 +753,7 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
         .setReferencedReplicatedSize(referencedReplicatedSize)
         .setExclusiveSize(exclusiveSize)
         .setExclusiveReplicatedSize(exclusiveReplicatedSize)
+        .setDeepCleanedDeletedDir(deepCleanedDeletedDir)
         .build();
   }
 }
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotInfo.java
 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotInfo.java
index 6dc3f913d0..dd9cf34c8e 100644
--- 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotInfo.java
+++ 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotInfo.java
@@ -66,12 +66,13 @@ public class TestOmSnapshotInfo {
         .setSnapshotPath(SNAPSHOT_PATH)
         .setCheckpointDir(CHECKPOINT_DIR)
         .setDbTxSequenceNumber(DB_TX_SEQUENCE_NUMBER)
-        .setDeepClean(true)
+        .setDeepClean(false)
         .setSstFiltered(false)
         .setReferencedSize(2000L)
         .setReferencedReplicatedSize(6000L)
         .setExclusiveSize(1000L)
         .setExclusiveReplicatedSize(3000L)
+        .setDeepCleanedDeletedDir(false)
         .build();
   }
 
@@ -89,12 +90,13 @@ public class TestOmSnapshotInfo {
         .setSnapshotPath(SNAPSHOT_PATH)
         .setCheckpointDir(CHECKPOINT_DIR)
         .setDbTxSequenceNumber(DB_TX_SEQUENCE_NUMBER)
-        .setDeepClean(true)
+        .setDeepClean(false)
         .setSstFiltered(false)
         .setReferencedSize(2000L)
         .setReferencedReplicatedSize(6000L)
         .setExclusiveSize(1000L)
         .setExclusiveReplicatedSize(3000L)
+        .setDeepCleanedDeletedDir(false)
         .build();
   }
 
@@ -140,6 +142,9 @@ public class TestOmSnapshotInfo {
     assertEquals(
         snapshotInfoEntryExpected.getExclusiveReplicatedSize(),
         snapshotInfoEntryActual.getExclusiveReplicatedSize());
+    assertEquals(
+        snapshotInfoEntryExpected.getDeepCleanedDeletedDir(),
+        snapshotInfoEntryActual.getDeepCleanedDeletedDir());
 
     assertEquals(snapshotInfoEntryExpected, snapshotInfoEntryActual);
   }
@@ -176,6 +181,8 @@ public class TestOmSnapshotInfo {
         snapshotInfoActual.getExclusiveSize());
     assertEquals(snapshotInfoExpected.getExclusiveReplicatedSize(),
         snapshotInfoActual.getExclusiveReplicatedSize());
+    assertEquals(snapshotInfoExpected.getDeepCleanedDeletedDir(),
+        snapshotInfoActual.getDeepCleanedDeletedDir());
 
     assertEquals(snapshotInfoExpected, snapshotInfoActual);
   }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotDirectoryCleaningService.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotDirectoryCleaningService.java
new file mode 100644
index 0000000000..6b39b76c54
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotDirectoryCleaningService.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.service.SnapshotDirectoryCleaningService;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Test Snapshot Directory Service.
+ */
+@Timeout(300)
+public class TestSnapshotDirectoryCleaningService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestSnapshotDirectoryCleaningService.class);
+
+  private static MiniOzoneCluster cluster;
+  private static FileSystem fs;
+  private static String volumeName;
+  private static String bucketName;
+  private static OzoneClient client;
+
+  @BeforeAll
+  public static void init() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setInt(OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL, 2500);
+    conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 2500,
+        TimeUnit.MILLISECONDS);
+    conf.setBoolean(OZONE_ACL_ENABLED, true);
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(3)
+        .build();
+    cluster.waitForClusterToBeReady();
+    client = cluster.newClient();
+
+    // create a volume and a bucket to be used by OzoneFileSystem
+    OzoneBucket bucket = TestDataUtil.createVolumeAndBucket(client,
+        BucketLayout.FILE_SYSTEM_OPTIMIZED);
+    volumeName = bucket.getVolumeName();
+    bucketName = bucket.getName();
+
+    String rootPath = String.format("%s://%s.%s/",
+        OzoneConsts.OZONE_URI_SCHEME, bucketName, volumeName);
+
+    // Set the fs.defaultFS and start the filesystem
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+    // Set the number of keys to be processed during batch operate.
+    conf.setInt(OZONE_FS_ITERATE_BATCH_SIZE, 5);
+
+    fs = FileSystem.get(conf);
+  }
+
+  @AfterAll
+  public static void teardown() {
+    IOUtils.closeQuietly(client);
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    IOUtils.closeQuietly(fs);
+  }
+
+  @AfterEach
+  public void cleanup() {
+    try {
+      Path root = new Path("/");
+      FileStatus[] fileStatuses = fs.listStatus(root);
+      for (FileStatus fileStatus : fileStatuses) {
+        fs.delete(fileStatus.getPath(), true);
+      }
+    } catch (IOException ex) {
+      fail("Failed to cleanup files.");
+    }
+  }
+
+  @SuppressWarnings("checkstyle:LineLength")
+  @Test
+  public void testExclusiveSizeWithDirectoryDeepClean() throws Exception {
+
+    Table<String, OmKeyInfo> deletedDirTable =
+        cluster.getOzoneManager().getMetadataManager().getDeletedDirTable();
+    Table<String, OmKeyInfo> keyTable =
+        cluster.getOzoneManager().getMetadataManager()
+            .getKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED);
+    Table<String, OmDirectoryInfo> dirTable =
+        cluster.getOzoneManager().getMetadataManager().getDirectoryTable();
+    Table<String, RepeatedOmKeyInfo> deletedKeyTable =
+        cluster.getOzoneManager().getMetadataManager().getDeletedTable();
+    Table<String, SnapshotInfo> snapshotInfoTable =
+        cluster.getOzoneManager().getMetadataManager().getSnapshotInfoTable();
+    SnapshotDirectoryCleaningService snapshotDirectoryCleaningService =
+        
cluster.getOzoneManager().getKeyManager().getSnapshotDirectoryService();
+
+    /*    DirTable
+    /v/b/snapDir
+    /v/b/snapDir/appRoot0-2/
+    /v/b/snapDir/appRoot0-2/parentDir0-2/
+          FileTable
+    /v/b/snapDir/testKey0 - testKey4  = 5 keys
+    /v/b/snapDir/appRoot0-2/parentDir0-2/childFile = 9 keys
+    /v/b/snapDir/appRoot0/parentDir0-2/childFile0-4 = 15 keys
+     */
+
+    Path root = new Path("/snapDir");
+    // Create parent dir from root.
+    fs.mkdirs(root);
+
+    // Add 5 files inside root dir
+    // Creates /v/b/snapDir/testKey0 - testKey4
+    for (int i = 0; i < 5; i++) {
+      Path path = new Path(root, "testKey" + i);
+      try (FSDataOutputStream stream = fs.create(path)) {
+        stream.write(1);
+      }
+    }
+
+    // Creates /v/b/snapDir/appRoot0-2/parentDir0-2/childFile
+    for (int i = 0; i < 3; i++) {
+      for (int j = 0; j < 3; j++) {
+        Path appRoot = new Path(root, "appRoot" + j);
+        Path parent = new Path(appRoot, "parentDir" + i);
+        Path child = new Path(parent, "childFile");
+        try (FSDataOutputStream stream = fs.create(child)) {
+          stream.write(1);
+        }
+      }
+    }
+
+    assertTableRowCount(keyTable, 14);
+    assertTableRowCount(dirTable, 13);
+    // Create snapshot
+    client.getObjectStore().createSnapshot(volumeName, bucketName, "snap1");
+
+    // Creates /v/b/snapDir/appRoot0/parentDir0-2/childFile0-4
+    for (int i = 0; i < 3; i++) {
+      Path appRoot = new Path(root, "appRoot0");
+      Path parent = new Path(appRoot, "parentDir" + i);
+      for (int j = 0; j < 5; j++) {
+        Path child = new Path(parent, "childFile" + j);
+        try (FSDataOutputStream stream = fs.create(child)) {
+          stream.write(1);
+        }
+      }
+    }
+
+    for (int i = 5; i < 10; i++) {
+      Path path = new Path(root, "testKey" + i);
+      try (FSDataOutputStream stream = fs.create(path)) {
+        stream.write(1);
+      }
+    }
+
+    assertTableRowCount(deletedDirTable, 0);
+    assertTableRowCount(keyTable, 34);
+    assertTableRowCount(dirTable, 13);
+    Path appRoot0 = new Path(root, "appRoot0");
+    // Only parentDir0-2/childFile under appRoot0 is exclusive for snap1
+    fs.delete(appRoot0, true);
+    assertTableRowCount(deletedDirTable, 1);
+    client.getObjectStore().createSnapshot(volumeName, bucketName, "snap2");
+
+    // Delete testKey0-9
+    for (int i = 0; i < 10; i++) {
+      Path testKey = new Path(root, "testKey" + i);
+      fs.delete(testKey, false);
+    }
+
+    fs.delete(root, true);
+    assertTableRowCount(deletedKeyTable, 10);
+    client.getObjectStore().createSnapshot(volumeName, bucketName, "snap3");
+    long prevRunCount = snapshotDirectoryCleaningService.getRunCount().get();
+    GenericTestUtils.waitFor(() -> 
snapshotDirectoryCleaningService.getRunCount().get()
+        > prevRunCount + 1, 100, 10000);
+
+    Thread.sleep(2000);
+    Map<String, Long> expectedSize = new HashMap<String, Long>() {{
+      // /v/b/snapDir/appRoot0/parentDir0-2/childFile contribute
+      // exclusive size, /v/b/snapDir/appRoot0/parentDir0-2/childFile0-4
+      // are deep cleaned and hence don't contribute to size.
+        put("snap1", 3L);
+      // Only testKey5-9 contribute to the exclusive size
+        put("snap2", 5L);
+        put("snap3", 0L);
+      }};
+    Thread.sleep(500);
+    try (TableIterator<String, ? extends Table.KeyValue<String, SnapshotInfo>>
+        iterator = snapshotInfoTable.iterator()) {
+      while (iterator.hasNext()) {
+        Table.KeyValue<String, SnapshotInfo> snapshotEntry = iterator.next();
+        String snapshotName = snapshotEntry.getValue().getName();
+        assertEquals(expectedSize.get(snapshotName), snapshotEntry.getValue().
+            getExclusiveSize());
+        // Since for the test we are using RATIS/THREE
+        assertEquals(expectedSize.get(snapshotName) * 3,
+            snapshotEntry.getValue().getExclusiveReplicatedSize());
+
+      }
+    }
+  }
+
+  private void assertTableRowCount(Table<String, ?> table, int count)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(() -> assertTableRowCount(count, table), 1000,
+        120000); // 2 minutes
+  }
+
+  private boolean assertTableRowCount(int expectedCount,
+                                      Table<String, ?> table) {
+    long count = 0L;
+    try {
+      count = cluster.getOzoneManager().getMetadataManager()
+          .countRowsInTable(table);
+      LOG.info("{} actual row count={}, expectedCount={}", table.getName(),
+          count, expectedCount);
+    } catch (IOException ex) {
+      fail("testDoubleBuffer failed with: " + ex);
+    }
+    return count == expectedCount;
+  }
+}
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 19844eed01..6a1a51bc32 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -522,7 +522,7 @@ enum Status {
     UNAUTHORIZED = 91;
 
     S3_SECRET_ALREADY_EXISTS = 92;
-    
+
     INVALID_PATH = 93;
 
     TOO_MANY_BUCKETS = 94;
@@ -852,6 +852,7 @@ message SnapshotInfo {
   // snapshot exclusive size after replication
   optional uint64 exclusiveReplicatedSize = 18;
   // note: shared sizes can be calculated from: referenced - exclusive
+  optional bool deepCleanedDeletedDir = 19;
 }
 
 message SnapshotDiffJobProto {
@@ -1892,17 +1893,27 @@ message SnapshotMoveKeyInfos {
 
 message SnapshotPurgeRequest {
   repeated string snapshotDBKeys = 1;
-  repeated string updatedSnapshotDBKey = 2;
+  repeated string updatedSnapshotDBKey = 2 [deprecated = true];
 }
 
 message SetSnapshotPropertyRequest {
-  optional SnapshotProperty snapshotProperty = 1;
+  optional SnapshotProperty snapshotProperty = 1 [deprecated = true];
+  optional string snapshotKey = 2;
+  optional SnapshotSize snapshotSize = 3;
+  optional bool deepCleanedDeletedDir = 4;
+  optional bool deepCleanedDeletedKey = 5;
 }
 
+// SnapshotProperty in entirely deprecated, Keeping it here for proto.lock 
compatibility
 message SnapshotProperty {
-  optional string snapshotKey = 1;
-  optional uint64 exclusiveSize = 2;
-  optional uint64 exclusiveReplicatedSize = 3;
+  optional string snapshotKey = 1 [deprecated = true];
+  optional uint64 exclusiveSize = 2 [deprecated = true];
+  optional uint64 exclusiveReplicatedSize = 3 [deprecated = true];
+}
+
+message SnapshotSize {
+  optional uint64 exclusiveSize = 1;
+  optional uint64 exclusiveReplicatedSize = 2;
 }
 
 message DeleteTenantRequest {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index 0fe1cdbe80..4378701426 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.service.KeyDeletingService;
 import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
+import org.apache.hadoop.ozone.om.service.SnapshotDirectoryCleaningService;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
 
 import java.io.IOException;
@@ -285,4 +286,10 @@ public interface KeyManager extends OzoneManagerFS, 
IOzoneAcl {
    * @return Background service.
    */
   SnapshotDeletingService getSnapshotDeletingService();
+
+  /**
+   * Returns the instance of Snapshot Directory service.
+   * @return Background service.
+   */
+  SnapshotDirectoryCleaningService getSnapshotDirectoryService();
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 24646d3128..a84fdaf1a6 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -86,6 +86,7 @@ import org.apache.hadoop.ozone.om.service.KeyDeletingService;
 import org.apache.hadoop.ozone.om.service.MultipartUploadCleanupService;
 import org.apache.hadoop.ozone.om.service.OpenKeyCleanupService;
 import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
+import org.apache.hadoop.ozone.om.service.SnapshotDirectoryCleaningService;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
@@ -131,6 +132,10 @@ import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_TIMEOUT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_TIMEOUT_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT;
 import static org.apache.hadoop.ozone.om.OzoneManagerUtils.getBucketLayout;
@@ -181,6 +186,7 @@ public class KeyManagerImpl implements KeyManager {
 
   private BackgroundService openKeyCleanupService;
   private BackgroundService multipartUploadCleanupService;
+  private SnapshotDirectoryCleaningService snapshotDirectoryCleaningService;
 
   public KeyManagerImpl(OzoneManager om, ScmClient scmClient,
       OzoneConfiguration conf, OMPerformanceMetrics metrics) {
@@ -300,6 +306,22 @@ public class KeyManagerImpl implements KeyManager {
       }
     }
 
+    if (snapshotDirectoryCleaningService == null &&
+        ozoneManager.isFilesystemSnapshotEnabled()) {
+      long dirDeleteInterval = configuration.getTimeDuration(
+          OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL,
+          OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL_DEFAULT,
+          TimeUnit.MILLISECONDS);
+      long serviceTimeout = configuration.getTimeDuration(
+          OZONE_SNAPSHOT_DIRECTORY_SERVICE_TIMEOUT,
+          OZONE_SNAPSHOT_DIRECTORY_SERVICE_TIMEOUT_DEFAULT,
+          TimeUnit.MILLISECONDS);
+      snapshotDirectoryCleaningService = new SnapshotDirectoryCleaningService(
+          dirDeleteInterval, TimeUnit.MILLISECONDS, serviceTimeout,
+          ozoneManager, scmClient.getBlockClient());
+      snapshotDirectoryCleaningService.start();
+    }
+
     if (multipartUploadCleanupService == null) {
       long serviceInterval = configuration.getTimeDuration(
           OZONE_OM_MPU_CLEANUP_SERVICE_INTERVAL,
@@ -346,6 +368,10 @@ public class KeyManagerImpl implements KeyManager {
       multipartUploadCleanupService.shutdown();
       multipartUploadCleanupService = null;
     }
+    if (snapshotDirectoryCleaningService != null) {
+      snapshotDirectoryCleaningService.shutdown();
+      snapshotDirectoryCleaningService = null;
+    }
   }
 
   private OmBucketInfo getBucketInfo(String volumeName, String bucketName)
@@ -681,6 +707,10 @@ public class KeyManagerImpl implements KeyManager {
     return snapshotDeletingService;
   }
 
+  public SnapshotDirectoryCleaningService getSnapshotDirectoryService() {
+    return snapshotDirectoryCleaningService;
+  }
+
   public boolean isSstFilteringSvcEnabled() {
     long serviceInterval = ozoneManager.getConfiguration()
         .getTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java
index 09711c7045..b7dba82602 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java
@@ -72,22 +72,10 @@ public class OMSnapshotPurgeRequest extends OMClientRequest 
{
     try {
       List<String> snapshotDbKeys = snapshotPurgeRequest
           .getSnapshotDBKeysList();
-      List<String> snapInfosToUpdate = snapshotPurgeRequest
-          .getUpdatedSnapshotDBKeyList();
       Map<String, SnapshotInfo> updatedSnapInfos = new HashMap<>();
       Map<String, SnapshotInfo> updatedPathPreviousAndGlobalSnapshots =
           new HashMap<>();
 
-      // Snapshots that are already deepCleaned by the KeyDeletingService
-      // can be marked as deepCleaned.
-      for (String snapTableKey : snapInfosToUpdate) {
-        SnapshotInfo snapInfo = omMetadataManager.getSnapshotInfoTable()
-            .get(snapTableKey);
-
-        updateSnapshotInfoAndCache(snapInfo, omMetadataManager,
-            trxnLogIndex, updatedSnapInfos, false);
-      }
-
       // Snapshots that are purged by the SnapshotDeletingService
       // will update the next snapshot so that is can be deep cleaned
       // by the KeyDeletingService in the next run.
@@ -100,7 +88,7 @@ public class OMSnapshotPurgeRequest extends OMClientRequest {
                 snapshotChainManager, omSnapshotManager);
 
         updateSnapshotInfoAndCache(nextSnapshot, omMetadataManager,
-            trxnLogIndex, updatedSnapInfos, true);
+            trxnLogIndex, updatedSnapInfos);
         updateSnapshotChainAndCache(omMetadataManager, fromSnapshot,
             trxnLogIndex, updatedPathPreviousAndGlobalSnapshots);
         ozoneManager.getOmSnapshotManager().getSnapshotCache()
@@ -120,9 +108,12 @@ public class OMSnapshotPurgeRequest extends 
OMClientRequest {
 
   private void updateSnapshotInfoAndCache(SnapshotInfo snapInfo,
       OmMetadataManagerImpl omMetadataManager, long trxnLogIndex,
-      Map<String, SnapshotInfo> updatedSnapInfos, boolean deepClean) {
+      Map<String, SnapshotInfo> updatedSnapInfos) {
     if (snapInfo != null) {
-      snapInfo.setDeepClean(deepClean);
+      // Setting next snapshot deep clean to false, Since the
+      // current snapshot is deleted. We can potentially
+      // reclaim more keys in the next snapshot.
+      snapInfo.setDeepClean(false);
 
       // Update table cache first
       omMetadataManager.getSnapshotInfoTable().addCacheEntry(
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotSetPropertyRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotSetPropertyRequest.java
index 966a265af8..b3dd5206c9 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotSetPropertyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotSetPropertyRequest.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import 
org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotSetPropertyResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotProperty;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotSize;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,16 +60,10 @@ public class OMSnapshotSetPropertyRequest extends 
OMClientRequest {
     OzoneManagerProtocolProtos.SetSnapshotPropertyRequest
         setSnapshotPropertyRequest = getOmRequest()
         .getSetSnapshotPropertyRequest();
-
-    SnapshotProperty snapshotProperty = setSnapshotPropertyRequest
-        .getSnapshotProperty();
     SnapshotInfo updatedSnapInfo = null;
 
     try {
-      String snapshotKey = snapshotProperty.getSnapshotKey();
-      long exclusiveSize = snapshotProperty.getExclusiveSize();
-      long exclusiveReplicatedSize = snapshotProperty
-          .getExclusiveReplicatedSize();
+      String snapshotKey = setSnapshotPropertyRequest.getSnapshotKey();
       updatedSnapInfo = metadataManager.getSnapshotInfoTable()
           .get(snapshotKey);
 
@@ -79,9 +73,28 @@ public class OMSnapshotSetPropertyRequest extends 
OMClientRequest {
             " is not found", INVALID_SNAPSHOT_ERROR);
       }
 
-      // Set Exclusive size.
-      updatedSnapInfo.setExclusiveSize(exclusiveSize);
-      updatedSnapInfo.setExclusiveReplicatedSize(exclusiveReplicatedSize);
+      if (setSnapshotPropertyRequest.hasDeepCleanedDeletedDir()) {
+        updatedSnapInfo.setDeepCleanedDeletedDir(setSnapshotPropertyRequest
+            .getDeepCleanedDeletedDir());
+      }
+
+      if (setSnapshotPropertyRequest.hasDeepCleanedDeletedKey()) {
+        updatedSnapInfo.setDeepClean(setSnapshotPropertyRequest
+            .getDeepCleanedDeletedKey());
+      }
+
+      if (setSnapshotPropertyRequest.hasSnapshotSize()) {
+        SnapshotSize snapshotSize = setSnapshotPropertyRequest
+            .getSnapshotSize();
+        long exclusiveSize = updatedSnapInfo.getExclusiveSize() +
+            snapshotSize.getExclusiveSize();
+        long exclusiveReplicatedSize = updatedSnapInfo
+            .getExclusiveReplicatedSize() + snapshotSize
+            .getExclusiveReplicatedSize();
+        // Set Exclusive size.
+        updatedSnapInfo.setExclusiveSize(exclusiveSize);
+        updatedSnapInfo.setExclusiveReplicatedSize(exclusiveReplicatedSize);
+      }
       // Update Table Cache
       metadataManager.getSnapshotInfoTable().addCacheEntry(
           new CacheKey<>(snapshotKey),
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
index 1091053ebd..21ad087276 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
@@ -469,6 +469,106 @@ public abstract class AbstractKeyDeletingService extends 
BackgroundService
     return remainNum;
   }
 
+  /**
+   * To calculate Exclusive Size for current snapshot, Check
+   * the next snapshot deletedTable if the deleted key is
+   * referenced in current snapshot and not referenced in the
+   * previous snapshot then that key is exclusive to the current
+   * snapshot. Here since we are only iterating through
+   * deletedTable we can check the previous and previous to
+   * previous snapshot to achieve the same.
+   * previousSnapshot - Snapshot for which exclusive size is
+   *                    getting calculating.
+   * currSnapshot - Snapshot's deletedTable is used to calculate
+   *                previousSnapshot snapshot's exclusive size.
+   * previousToPrevSnapshot - Snapshot which is used to check
+   *                 if key is exclusive to previousSnapshot.
+   */
+  @SuppressWarnings("checkstyle:ParameterNumber")
+  public void calculateExclusiveSize(
+      SnapshotInfo previousSnapshot,
+      SnapshotInfo previousToPrevSnapshot,
+      OmKeyInfo keyInfo,
+      OmBucketInfo bucketInfo, long volumeId,
+      Table<String, String> snapRenamedTable,
+      Table<String, OmKeyInfo> previousKeyTable,
+      Table<String, String> prevRenamedTable,
+      Table<String, OmKeyInfo> previousToPrevKeyTable,
+      Map<String, Long> exclusiveSizeMap,
+      Map<String, Long> exclusiveReplicatedSizeMap) throws IOException {
+    String prevSnapKey = previousSnapshot.getTableKey();
+    long exclusiveReplicatedSize =
+        exclusiveReplicatedSizeMap.getOrDefault(
+            prevSnapKey, 0L) + keyInfo.getReplicatedSize();
+    long exclusiveSize = exclusiveSizeMap.getOrDefault(
+        prevSnapKey, 0L) + keyInfo.getDataSize();
+
+    // If there is no previous to previous snapshot, then
+    // the previous snapshot is the first snapshot.
+    if (previousToPrevSnapshot == null) {
+      exclusiveSizeMap.put(prevSnapKey, exclusiveSize);
+      exclusiveReplicatedSizeMap.put(prevSnapKey,
+          exclusiveReplicatedSize);
+    } else {
+      OmKeyInfo keyInfoPrevSnapshot = getPreviousSnapshotKeyName(
+          keyInfo, bucketInfo, volumeId,
+          snapRenamedTable, previousKeyTable);
+      OmKeyInfo keyInfoPrevToPrevSnapshot = getPreviousSnapshotKeyName(
+          keyInfoPrevSnapshot, bucketInfo, volumeId,
+          prevRenamedTable, previousToPrevKeyTable);
+      // If the previous to previous snapshot doesn't
+      // have the key, then it is exclusive size for the
+      // previous snapshot.
+      if (keyInfoPrevToPrevSnapshot == null) {
+        exclusiveSizeMap.put(prevSnapKey, exclusiveSize);
+        exclusiveReplicatedSizeMap.put(prevSnapKey,
+            exclusiveReplicatedSize);
+      }
+    }
+  }
+
+  private OmKeyInfo getPreviousSnapshotKeyName(
+      OmKeyInfo keyInfo, OmBucketInfo bucketInfo, long volumeId,
+      Table<String, String> snapRenamedTable,
+      Table<String, OmKeyInfo> previousKeyTable) throws IOException {
+
+    if (keyInfo == null) {
+      return null;
+    }
+
+    String dbKeyPrevSnap;
+    if (bucketInfo.getBucketLayout().isFileSystemOptimized()) {
+      dbKeyPrevSnap = getOzoneManager().getMetadataManager().getOzonePathKey(
+          volumeId,
+          bucketInfo.getObjectID(),
+          keyInfo.getParentObjectID(),
+          keyInfo.getFileName());
+    } else {
+      dbKeyPrevSnap = getOzoneManager().getMetadataManager().getOzoneKey(
+          keyInfo.getVolumeName(),
+          keyInfo.getBucketName(),
+          keyInfo.getKeyName());
+    }
+
+    String dbRenameKey = getOzoneManager().getMetadataManager().getRenameKey(
+        keyInfo.getVolumeName(),
+        keyInfo.getBucketName(),
+        keyInfo.getObjectID());
+
+    String renamedKey = snapRenamedTable.getIfExist(dbRenameKey);
+    OmKeyInfo prevKeyInfo = renamedKey != null ?
+        previousKeyTable.get(renamedKey) :
+        previousKeyTable.get(dbKeyPrevSnap);
+
+    if (prevKeyInfo == null ||
+        prevKeyInfo.getObjectID() != keyInfo.getObjectID()) {
+      return null;
+    }
+
+    return isBlockLocationInfoSame(prevKeyInfo, keyInfo) ?
+        prevKeyInfo : null;
+  }
+
   protected boolean isBufferLimitCrossed(
       int maxLimit, int cLimit, int increment) {
     return cLimit + increment >= maxLimit;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
index 6dcc2544b4..e89608e82d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
@@ -48,8 +48,7 @@ import 
org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
 import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotPurgeRequest;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotProperty;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotSize;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetSnapshotPropertyRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.hadoop.hdds.utils.BackgroundTask;
@@ -98,6 +97,7 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
   private final Map<String, Long> exclusiveSizeMap;
   private final Map<String, Long> exclusiveReplicatedSizeMap;
   private final Set<String> completedExclusiveSizeSet;
+  private final Map<String, String> snapshotSeekMap;
 
   public KeyDeletingService(OzoneManager ozoneManager,
       ScmBlockLocationProtocol scmClient,
@@ -116,6 +116,7 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
     this.exclusiveSizeMap = new HashMap<>();
     this.exclusiveReplicatedSizeMap = new HashMap<>();
     this.completedExclusiveSizeSet = new HashSet<>();
+    this.snapshotSeekMap = new HashMap<>();
   }
 
   /**
@@ -258,8 +259,8 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
 
           // Deep clean only on active snapshot. Deleted Snapshots will be
           // cleaned up by SnapshotDeletingService.
-          if (!currSnapInfo.getSnapshotStatus().equals(SNAPSHOT_ACTIVE) ||
-              !currSnapInfo.getDeepClean()) {
+          if (currSnapInfo.getSnapshotStatus() != SNAPSHOT_ACTIVE ||
+              currSnapInfo.getDeepClean()) {
             continue;
           }
 
@@ -342,11 +343,22 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
                 RepeatedOmKeyInfo>> deletedIterator = snapDeletedTable
                 .iterator()) {
 
-              deletedIterator.seek(snapshotBucketKey);
+              String lastKeyInCurrentRun = null;
+              String deletedTableSeek = snapshotSeekMap.getOrDefault(
+                  currSnapInfo.getTableKey(), snapshotBucketKey);
+              deletedIterator.seek(deletedTableSeek);
+              // To avoid processing the last key from the previous
+              // run again.
+              if (!deletedTableSeek.equals(snapshotBucketKey) &&
+                  deletedIterator.hasNext()) {
+                deletedIterator.next();
+              }
+
               while (deletedIterator.hasNext() && delCount < keyLimitPerTask) {
                 Table.KeyValue<String, RepeatedOmKeyInfo>
                     deletedKeyValue = deletedIterator.next();
                 String deletedKey = deletedKeyValue.getKey();
+                lastKeyInCurrentRun = deletedKey;
 
                 // Exit if it is out of the bucket scope.
                 if (!deletedKey.startsWith(snapshotBucketKey)) {
@@ -366,7 +378,8 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
                     calculateExclusiveSize(previousSnapshot,
                         previousToPrevSnapshot, keyInfo, bucketInfo, volumeId,
                         snapRenamedTable, previousKeyTable, prevRenamedTable,
-                        previousToPrevKeyTable);
+                        previousToPrevKeyTable, exclusiveSizeMap,
+                        exclusiveReplicatedSizeMap);
                   }
 
                   if (isKeyReclaimable(previousKeyTable, snapRenamedTable,
@@ -406,6 +419,15 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
                   completedExclusiveSizeSet.add(
                       previousSnapshot.getTableKey());
                 }
+
+                snapshotSeekMap.remove(currSnapInfo.getTableKey());
+              } else {
+                // There are keys that still needs processing
+                // we can continue from it in the next iteration
+                if (lastKeyInCurrentRun != null) {
+                  snapshotSeekMap.put(currSnapInfo.getTableKey(),
+                      lastKeyInCurrentRun);
+                }
               }
 
               if (!keysToPurge.isEmpty()) {
@@ -420,98 +442,8 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
         }
       }
 
-      updateSnapshotExclusiveSize();
       updateDeepCleanedSnapshots(deepCleanedSnapshots);
-    }
-
-    /**
-     * To calculate Exclusive Size for current snapshot, Check
-     * the next snapshot deletedTable if the deleted key is
-     * referenced in current snapshot and not referenced in the
-     * previous snapshot then that key is exclusive to the current
-     * snapshot. Here since we are only iterating through
-     * deletedTable we can check the previous and previous to
-     * previous snapshot to achieve the same.
-     * previousSnapshot - Snapshot for which exclusive size is
-     *                    getting calculating.
-     * currSnapshot - Snapshot's deletedTable is used to calculate
-     *                previousSnapshot snapshot's exclusive size.
-     * previousToPrevSnapshot - Snapshot which is used to check
-     *                 if key is exclusive to previousSnapshot.
-     */
-    @SuppressWarnings("checkstyle:ParameterNumber")
-    private void calculateExclusiveSize(
-        SnapshotInfo previousSnapshot,
-        SnapshotInfo previousToPrevSnapshot,
-        OmKeyInfo keyInfo,
-        OmBucketInfo bucketInfo, long volumeId,
-        Table<String, String> snapRenamedTable,
-        Table<String, OmKeyInfo> previousKeyTable,
-        Table<String, String> prevRenamedTable,
-        Table<String, OmKeyInfo> previousToPrevKeyTable) throws IOException {
-      String prevSnapKey = previousSnapshot.getTableKey();
-      long exclusiveReplicatedSize =
-          exclusiveReplicatedSizeMap.getOrDefault(
-              prevSnapKey, 0L) + keyInfo.getReplicatedSize();
-      long exclusiveSize = exclusiveSizeMap.getOrDefault(
-          prevSnapKey, 0L) + keyInfo.getDataSize();
-
-      // If there is no previous to previous snapshot, then
-      // the previous snapshot is the first snapshot.
-      if (previousToPrevSnapshot == null) {
-        exclusiveSizeMap.put(prevSnapKey, exclusiveSize);
-        exclusiveReplicatedSizeMap.put(prevSnapKey,
-            exclusiveReplicatedSize);
-      } else {
-        OmKeyInfo keyInfoPrevSnapshot = getPreviousSnapshotKeyName(
-                keyInfo, bucketInfo, volumeId,
-                snapRenamedTable, previousKeyTable);
-        OmKeyInfo keyInfoPrevToPrevSnapshot = getPreviousSnapshotKeyName(
-                keyInfoPrevSnapshot, bucketInfo, volumeId,
-                prevRenamedTable, previousToPrevKeyTable);
-        // If the previous to previous snapshot doesn't
-        // have the key, then it is exclusive size for the
-        // previous snapshot.
-        if (keyInfoPrevToPrevSnapshot == null) {
-          exclusiveSizeMap.put(prevSnapKey, exclusiveSize);
-          exclusiveReplicatedSizeMap.put(prevSnapKey,
-              exclusiveReplicatedSize);
-        }
-      }
-    }
-
-    private OmKeyInfo getPreviousSnapshotKeyName(
-        OmKeyInfo keyInfo, OmBucketInfo bucketInfo, long volumeId,
-        Table<String, String> snapRenamedTable,
-        Table<String, OmKeyInfo> previousKeyTable) throws IOException {
-
-      if (keyInfo == null) {
-        return null;
-      }
-
-      String dbKeyPrevSnap;
-      if (bucketInfo.getBucketLayout().isFileSystemOptimized()) {
-        dbKeyPrevSnap = getOzoneManager().getMetadataManager().getOzonePathKey(
-            volumeId,
-            bucketInfo.getObjectID(),
-            keyInfo.getParentObjectID(),
-            keyInfo.getFileName());
-      } else {
-        dbKeyPrevSnap = getOzoneManager().getMetadataManager().getOzoneKey(
-            keyInfo.getVolumeName(),
-            keyInfo.getBucketName(),
-            keyInfo.getKeyName());
-      }
-
-      String dbRenameKey = getOzoneManager().getMetadataManager().getRenameKey(
-          keyInfo.getVolumeName(),
-          keyInfo.getBucketName(),
-          keyInfo.getObjectID());
-
-      String renamedKey = snapRenamedTable.getIfExist(dbRenameKey);
-      dbKeyPrevSnap = renamedKey != null ? renamedKey : dbKeyPrevSnap;
-
-      return previousKeyTable.get(dbKeyPrevSnap);
+      updateSnapshotExclusiveSize();
     }
 
     private void updateSnapshotExclusiveSize() {
@@ -525,15 +457,15 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
       while (completedSnapshotIterator.hasNext()) {
         ClientId clientId = ClientId.randomId();
         String dbKey = completedSnapshotIterator.next();
-        SnapshotProperty snapshotProperty = SnapshotProperty.newBuilder()
-                .setSnapshotKey(dbKey)
-                .setExclusiveSize(exclusiveSizeMap.get(dbKey))
+        SnapshotSize snapshotSize = SnapshotSize.newBuilder()
+                .setExclusiveSize(exclusiveSizeMap.getOrDefault(dbKey, 0L))
                 .setExclusiveReplicatedSize(
-                    exclusiveReplicatedSizeMap.get(dbKey))
+                    exclusiveReplicatedSizeMap.getOrDefault(dbKey, 0L))
                 .build();
         SetSnapshotPropertyRequest setSnapshotPropertyRequest =
             SetSnapshotPropertyRequest.newBuilder()
-                .setSnapshotProperty(snapshotProperty)
+                .setSnapshotKey(dbKey)
+                .setSnapshotSize(snapshotSize)
                 .build();
 
         OMRequest omRequest = OMRequest.newBuilder()
@@ -549,16 +481,17 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
     }
 
     private void updateDeepCleanedSnapshots(List<String> deepCleanedSnapshots) 
{
-      if (!deepCleanedSnapshots.isEmpty()) {
+      for (String deepCleanedSnapshot: deepCleanedSnapshots) {
         ClientId clientId = ClientId.randomId();
-        SnapshotPurgeRequest snapshotPurgeRequest = SnapshotPurgeRequest
-            .newBuilder()
-            .addAllUpdatedSnapshotDBKey(deepCleanedSnapshots)
-            .build();
+        SetSnapshotPropertyRequest setSnapshotPropertyRequest =
+            SetSnapshotPropertyRequest.newBuilder()
+                .setSnapshotKey(deepCleanedSnapshot)
+                .setDeepCleanedDeletedKey(true)
+                .build();
 
         OMRequest omRequest = OMRequest.newBuilder()
-            .setCmdType(Type.SnapshotPurge)
-            .setSnapshotPurgeRequest(snapshotPurgeRequest)
+            .setCmdType(Type.SetSnapshotProperty)
+            .setSetSnapshotPropertyRequest(setSnapshotPropertyRequest)
             .setClientId(clientId.toString())
             .build();
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java
new file mode 100644
index 0000000000..9a60f63038
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.om.IOmMetadataReader;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.SnapshotChainManager;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
+import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
+import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetSnapshotPropertyRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotSize;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
+import static 
org.apache.hadoop.ozone.om.helpers.SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE;
+import static 
org.apache.hadoop.ozone.om.request.file.OMFileRequest.getDirectoryInfo;
+import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getOzonePathKeyForFso;
+
+/**
+ * Snapshot BG Service for deleted directory deep clean and exclusive size
+ * calculation for deleted directories.
+ */
+public class SnapshotDirectoryCleaningService
+    extends AbstractKeyDeletingService {
+  // Use only a single thread for DirDeletion. Multiple threads would read
+  // or write to same tables and can send deletion requests for same key
+  // multiple times.
+  private static final int SNAPSHOT_DIR_CORE_POOL_SIZE = 1;
+
+  private final AtomicBoolean suspended;
+  private final Map<String, Long> exclusiveSizeMap;
+  private final Map<String, Long> exclusiveReplicatedSizeMap;
+
+  public SnapshotDirectoryCleaningService(long interval, TimeUnit unit,
+                                          long serviceTimeout,
+                                          OzoneManager ozoneManager,
+                                          ScmBlockLocationProtocol scmClient) {
+    super(SnapshotDirectoryCleaningService.class.getSimpleName(),
+        interval, unit, SNAPSHOT_DIR_CORE_POOL_SIZE, serviceTimeout,
+        ozoneManager, scmClient);
+    this.suspended = new AtomicBoolean(false);
+    this.exclusiveSizeMap = new HashMap<>();
+    this.exclusiveReplicatedSizeMap = new HashMap<>();
+  }
+
+  private boolean shouldRun() {
+    if (getOzoneManager() == null) {
+      // OzoneManager can be null for testing
+      return true;
+    }
+    return getOzoneManager().isLeaderReady() && !suspended.get();
+  }
+
+  /**
+   * Suspend the service.
+   */
+  @VisibleForTesting
+  public void suspend() {
+    suspended.set(true);
+  }
+
+  /**
+   * Resume the service if suspended.
+   */
+  @VisibleForTesting
+  public void resume() {
+    suspended.set(false);
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new SnapshotDirectoryCleaningService.SnapshotDirTask());
+    return queue;
+  }
+
+  private class SnapshotDirTask implements BackgroundTask {
+
+    @Override
+    public BackgroundTaskResult call() {
+      if (!shouldRun()) {
+        return BackgroundTaskResult.EmptyTaskResult.newResult();
+      }
+      LOG.debug("Running SnapshotDirectoryCleaningService");
+
+      getRunCount().incrementAndGet();
+      OmSnapshotManager omSnapshotManager =
+          getOzoneManager().getOmSnapshotManager();
+      Table<String, SnapshotInfo> snapshotInfoTable =
+          getOzoneManager().getMetadataManager().getSnapshotInfoTable();
+      OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl)
+          getOzoneManager().getMetadataManager();
+      SnapshotChainManager snapChainManager = metadataManager
+          .getSnapshotChainManager();
+
+      try (TableIterator<String, ? extends Table.KeyValue
+          <String, SnapshotInfo>> iterator = snapshotInfoTable.iterator()) {
+
+        while (iterator.hasNext()) {
+          SnapshotInfo currSnapInfo = iterator.next().getValue();
+
+          // Expand deleted dirs only on active snapshot. Deleted Snapshots
+          // will be cleaned up by SnapshotDeletingService.
+          if (currSnapInfo.getSnapshotStatus() != SNAPSHOT_ACTIVE ||
+              currSnapInfo.getDeepCleanedDeletedDir()) {
+            continue;
+          }
+
+          ReferenceCounted<IOmMetadataReader, SnapshotCache>
+              rcPrevOmSnapshot = null;
+          ReferenceCounted<IOmMetadataReader, SnapshotCache>
+              rcPrevToPrevOmSnapshot = null;
+          try {
+            long volumeId = metadataManager
+                .getVolumeId(currSnapInfo.getVolumeName());
+            // Get bucketInfo for the snapshot bucket to get bucket layout.
+            String dbBucketKey = metadataManager
+                .getBucketKey(currSnapInfo.getVolumeName(),
+                    currSnapInfo.getBucketName());
+            OmBucketInfo bucketInfo = metadataManager
+                .getBucketTable().get(dbBucketKey);
+
+            if (bucketInfo == null) {
+              throw new IllegalStateException("Bucket " + "/" +
+                  currSnapInfo.getVolumeName() + "/" + currSnapInfo
+                  .getBucketName() +
+                  " is not found. BucketInfo should not be " +
+                  "null for snapshotted bucket. The OM is in " +
+                  "unexpected state.");
+            }
+
+            SnapshotInfo previousSnapshot = getPreviousActiveSnapshot(
+                currSnapInfo, snapChainManager, omSnapshotManager);
+            SnapshotInfo previousToPrevSnapshot = null;
+
+            Table<String, OmKeyInfo> previousKeyTable = null;
+            Table<String, String> prevRenamedTable = null;
+
+            if (previousSnapshot != null) {
+              rcPrevOmSnapshot = omSnapshotManager.checkForSnapshot(
+                  previousSnapshot.getVolumeName(),
+                  previousSnapshot.getBucketName(),
+                  getSnapshotPrefix(previousSnapshot.getName()), false);
+              OmSnapshot omPreviousSnapshot = (OmSnapshot)
+                  rcPrevOmSnapshot.get();
+
+              previousKeyTable = omPreviousSnapshot.getMetadataManager()
+                  .getKeyTable(bucketInfo.getBucketLayout());
+              prevRenamedTable = omPreviousSnapshot
+                  .getMetadataManager().getSnapshotRenamedTable();
+              previousToPrevSnapshot = getPreviousActiveSnapshot(
+                  previousSnapshot, snapChainManager, omSnapshotManager);
+            }
+
+            Table<String, OmKeyInfo> previousToPrevKeyTable = null;
+            if (previousToPrevSnapshot != null) {
+              rcPrevToPrevOmSnapshot = omSnapshotManager.checkForSnapshot(
+                  previousToPrevSnapshot.getVolumeName(),
+                  previousToPrevSnapshot.getBucketName(),
+                  getSnapshotPrefix(previousToPrevSnapshot.getName()), false);
+              OmSnapshot omPreviousToPrevSnapshot = (OmSnapshot)
+                  rcPrevToPrevOmSnapshot.get();
+
+              previousToPrevKeyTable = omPreviousToPrevSnapshot
+                  .getMetadataManager()
+                  .getKeyTable(bucketInfo.getBucketLayout());
+            }
+
+            String dbBucketKeyForDir = getOzonePathKeyForFso(metadataManager,
+                currSnapInfo.getVolumeName(), currSnapInfo.getBucketName());
+            try (ReferenceCounted<IOmMetadataReader, SnapshotCache>
+                     rcCurrOmSnapshot = omSnapshotManager.checkForSnapshot(
+                currSnapInfo.getVolumeName(),
+                currSnapInfo.getBucketName(),
+                getSnapshotPrefix(currSnapInfo.getName()),
+                false)) {
+
+              OmSnapshot currOmSnapshot = (OmSnapshot) rcCurrOmSnapshot.get();
+              Table<String, OmKeyInfo> snapDeletedDirTable =
+                  currOmSnapshot.getMetadataManager().getDeletedDirTable();
+
+              try (TableIterator<String, ? extends Table.KeyValue<String,
+                  OmKeyInfo>> deletedDirIterator = snapDeletedDirTable
+                  .iterator(dbBucketKeyForDir)) {
+
+                while (deletedDirIterator.hasNext()) {
+                  Table.KeyValue<String, OmKeyInfo> deletedDirInfo =
+                      deletedDirIterator.next();
+
+                  // For each deleted directory we do an in-memory DFS and
+                  // do a deep clean and exclusive size calculation.
+                  iterateDirectoryTree(deletedDirInfo, volumeId, bucketInfo,
+                      previousSnapshot, previousToPrevSnapshot,
+                      currOmSnapshot, previousKeyTable, prevRenamedTable,
+                      previousToPrevKeyTable, dbBucketKeyForDir);
+                }
+                updateDeepCleanSnapshotDir(currSnapInfo.getTableKey());
+                if (previousSnapshot != null) {
+                  updateExclusiveSize(previousSnapshot.getTableKey());
+                }
+              }
+            }
+          } finally {
+            IOUtils.closeQuietly(rcPrevOmSnapshot, rcPrevToPrevOmSnapshot);
+          }
+        }
+      } catch (IOException ex) {
+        LOG.error("Error while running directory deep clean on snapshots." +
+            " Will retry at next run.", ex);
+      }
+      return BackgroundTaskResult.EmptyTaskResult.newResult();
+    }
+  }
+
+  @SuppressWarnings("checkstyle:ParameterNumber")
+  private void iterateDirectoryTree(
+      Table.KeyValue<String, OmKeyInfo> deletedDirInfo, long volumeId,
+      OmBucketInfo bucketInfo,
+      SnapshotInfo previousSnapshot,
+      SnapshotInfo previousToPrevSnapshot,
+      OmSnapshot currOmSnapshot,
+      Table<String, OmKeyInfo> previousKeyTable,
+      Table<String, String> prevRenamedTable,
+      Table<String, OmKeyInfo> previousToPrevKeyTable,
+      String dbBucketKeyForDir) throws IOException {
+
+    Table<String, OmDirectoryInfo> snapDirTable =
+        currOmSnapshot.getMetadataManager().getDirectoryTable();
+    Table<String, String> snapRenamedTable =
+        currOmSnapshot.getMetadataManager().getSnapshotRenamedTable();
+
+    Stack<StackNode> stackNodes = new Stack<>();
+    OmDirectoryInfo omDeletedDirectoryInfo =
+        getDirectoryInfo(deletedDirInfo.getValue());
+    String dirPathDbKey = currOmSnapshot.getMetadataManager()
+        .getOzonePathKey(volumeId, bucketInfo.getObjectID(),
+            omDeletedDirectoryInfo);
+    // Stack Init
+    StackNode topLevelDir = new StackNode();
+    topLevelDir.setDirKey(dirPathDbKey);
+    topLevelDir.setDirValue(omDeletedDirectoryInfo);
+    stackNodes.push(topLevelDir);
+
+    try (TableIterator<String, ? extends Table.KeyValue<String, 
OmDirectoryInfo>>
+            directoryIterator = snapDirTable.iterator(dbBucketKeyForDir)) {
+
+      while (!stackNodes.isEmpty()) {
+        StackNode stackTop = stackNodes.peek();
+        // First process all the files in the current directory
+        // and then do a DFS for directory.
+        if (StringUtils.isEmpty(stackTop.getSubDirSeek())) {
+          processFilesUnderDir(previousSnapshot,
+              previousToPrevSnapshot,
+              volumeId,
+              bucketInfo,
+              stackTop.getDirValue(),
+              currOmSnapshot.getMetadataManager(),
+              snapRenamedTable,
+              previousKeyTable,
+              prevRenamedTable,
+              previousToPrevKeyTable);
+          // Format : /volId/bucketId/parentId/
+          String seekDirInDB = currOmSnapshot.getMetadataManager()
+              .getOzonePathKey(volumeId, bucketInfo.getObjectID(),
+                  stackTop.getDirValue().getObjectID(), "");
+          stackTop.setSubDirSeek(seekDirInDB);
+        } else {
+          // Adding \0 to seek the next greater element.
+          directoryIterator.seek(stackTop.getSubDirSeek() + "\0");
+          if (directoryIterator.hasNext()) {
+
+            Table.KeyValue<String, OmDirectoryInfo> deletedSubDirInfo = 
directoryIterator.next();
+            String deletedSubDirKey = deletedSubDirInfo.getKey();
+            String prefixCheck = currOmSnapshot.getMetadataManager()
+                .getOzoneDeletePathDirKey(stackTop.getSubDirSeek());
+            // Exit if it is out of the sub dir prefix scope.
+            if (!deletedSubDirKey.startsWith(prefixCheck)) {
+              stackNodes.pop();
+            } else {
+              stackTop.setSubDirSeek(deletedSubDirKey);
+              StackNode nextSubDir = new StackNode();
+              nextSubDir.setDirKey(deletedSubDirInfo.getKey());
+              nextSubDir.setDirValue(deletedSubDirInfo.getValue());
+              stackNodes.push(nextSubDir);
+            }
+          } else {
+            stackNodes.pop();
+          }
+        }
+      }
+    }
+  }
+
+  private void updateExclusiveSize(String prevSnapshotKeyTable) {
+    ClientId clientId = ClientId.randomId();
+    SnapshotSize snapshotSize = SnapshotSize.newBuilder()
+            .setExclusiveSize(
+                exclusiveSizeMap.getOrDefault(prevSnapshotKeyTable, 0L))
+            .setExclusiveReplicatedSize(
+                exclusiveReplicatedSizeMap.getOrDefault(
+                    prevSnapshotKeyTable, 0L))
+            .build();
+    exclusiveSizeMap.remove(prevSnapshotKeyTable);
+    exclusiveReplicatedSizeMap.remove(prevSnapshotKeyTable);
+    SetSnapshotPropertyRequest
+        setSnapshotPropertyRequest =
+        SetSnapshotPropertyRequest.newBuilder()
+            .setSnapshotKey(prevSnapshotKeyTable)
+            .setSnapshotSize(snapshotSize)
+            .build();
+
+    OMRequest omRequest = OMRequest.newBuilder()
+            .setCmdType(Type.SetSnapshotProperty)
+            .setSetSnapshotPropertyRequest(setSnapshotPropertyRequest)
+            .setClientId(clientId.toString())
+            .build();
+
+    submitRequest(omRequest, clientId);
+  }
+
+  @SuppressWarnings("checkstyle:ParameterNumber")
+  private void processFilesUnderDir(
+      SnapshotInfo previousSnapshot,
+      SnapshotInfo previousToPrevSnapshot,
+      long volumeId,
+      OmBucketInfo bucketInfo,
+      OmDirectoryInfo parentInfo,
+      OMMetadataManager metadataManager,
+      Table<String, String> snapRenamedTable,
+      Table<String, OmKeyInfo> previousKeyTable,
+      Table<String, String> prevRenamedTable,
+      Table<String, OmKeyInfo> previousToPrevKeyTable)
+        throws IOException {
+    String seekFileInDB = metadataManager.getOzonePathKey(volumeId,
+        bucketInfo.getObjectID(),
+        parentInfo.getObjectID(), "");
+    List<BlockGroup> blocksForKeyDelete = new ArrayList<>();
+
+    Table<String, OmKeyInfo> fileTable = metadataManager.getFileTable();
+    try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
+             iterator = fileTable.iterator(seekFileInDB)) {
+
+      while (iterator.hasNext()) {
+        Table.KeyValue<String, OmKeyInfo> entry = iterator.next();
+        OmKeyInfo fileInfo = entry.getValue();
+        if (!OMFileRequest.isImmediateChild(fileInfo.getParentObjectID(),
+            parentInfo.getObjectID())) {
+          break;
+        }
+
+        String ozoneDeletePathKey = metadataManager
+            .getOzoneDeletePathKey(fileInfo.getObjectID(), entry.getKey());
+        if (isKeyReclaimable(previousKeyTable, snapRenamedTable,
+            fileInfo, bucketInfo, volumeId, null)) {
+          for (OmKeyLocationInfoGroup keyLocations :
+              fileInfo.getKeyLocationVersions()) {
+            List<BlockID> item = keyLocations.getLocationList().stream()
+                .map(b -> new BlockID(b.getContainerID(), b.getLocalID()))
+                .collect(Collectors.toList());
+            BlockGroup keyBlocks = BlockGroup.newBuilder()
+                .setKeyName(ozoneDeletePathKey)
+                .addAllBlockIDs(item)
+                .build();
+            blocksForKeyDelete.add(keyBlocks);
+          }
+          // TODO: Add Retry mechanism.
+          getScmClient().deleteKeyBlocks(blocksForKeyDelete);
+        } else if (previousSnapshot != null) {
+          calculateExclusiveSize(previousSnapshot, previousToPrevSnapshot,
+              fileInfo, bucketInfo, volumeId, snapRenamedTable,
+              previousKeyTable, prevRenamedTable, previousToPrevKeyTable,
+              exclusiveSizeMap, exclusiveReplicatedSizeMap);
+        }
+      }
+    }
+  }
+
+  private void updateDeepCleanSnapshotDir(String snapshotKeyTable) {
+    ClientId clientId = ClientId.randomId();
+    SetSnapshotPropertyRequest setSnapshotPropertyRequest =
+        SetSnapshotPropertyRequest.newBuilder()
+            .setSnapshotKey(snapshotKeyTable)
+            .setDeepCleanedDeletedDir(true)
+            .build();
+
+    OMRequest omRequest = OMRequest.newBuilder()
+            .setCmdType(Type.SetSnapshotProperty)
+            .setSetSnapshotPropertyRequest(setSnapshotPropertyRequest)
+            .setClientId(clientId.toString())
+            .build();
+
+    submitRequest(omRequest, clientId);
+  }
+
+  public void submitRequest(OMRequest omRequest, ClientId clientId) {
+    try {
+      if (isRatisEnabled()) {
+        OzoneManagerRatisServer server =
+            getOzoneManager().getOmRatisServer();
+
+        RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
+            .setClientId(clientId)
+            .setServerId(server.getRaftPeerId())
+            .setGroupId(server.getRaftGroupId())
+            .setCallId(getRunCount().get())
+            .setMessage(Message.valueOf(
+                OMRatisHelper.convertRequestToByteString(omRequest)))
+            .setType(RaftClientRequest.writeRequestType())
+            .build();
+
+        server.submitRequest(omRequest, raftClientRequest);
+      } else {
+        getOzoneManager().getOmServerProtocol()
+            .submitRequest(null, omRequest);
+      }
+    } catch (ServiceException e) {
+      LOG.error("Snapshot deep cleaning request failed. " +
+          "Will retry at next run.", e);
+    }
+  }
+
+  /**
+   * Stack node data for directory deep clean for snapshot.
+   */
+  private static class StackNode {
+    private String dirKey;
+    private OmDirectoryInfo dirValue;
+    private String subDirSeek;
+
+    public String getDirKey() {
+      return dirKey;
+    }
+
+    public void setDirKey(String dirKey) {
+      this.dirKey = dirKey;
+    }
+
+    public OmDirectoryInfo getDirValue() {
+      return dirValue;
+    }
+
+    public void setDirValue(OmDirectoryInfo dirValue) {
+      this.dirValue = dirValue;
+    }
+
+    public String getSubDirSeek() {
+      return subDirSeek;
+    }
+
+    public void setSubDirSeek(String subDirSeek) {
+      this.subDirSeek = subDirSeek;
+    }
+
+    @Override
+    public String toString() {
+      return "StackNode{" +
+          "dirKey='" + dirKey + '\'' +
+          ", dirObjectId=" + dirValue.getObjectID() +
+          ", subDirSeek='" + subDirSeek + '\'' +
+          '}';
+    }
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotSetPropertyRequestAndResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotSetPropertyRequestAndResponse.java
index 3bff9645c3..3856a5b62f 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotSetPropertyRequestAndResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotSetPropertyRequestAndResponse.java
@@ -36,7 +36,7 @@ import 
org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotSetPropertyRespons
 import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotProperty;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotSize;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetSnapshotPropertyRequest;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.BeforeEach;
@@ -150,14 +150,14 @@ public class TestOMSnapshotSetPropertyRequestAndResponse {
              iterator = omMetadataManager.getSnapshotInfoTable().iterator()) {
       while (iterator.hasNext()) {
         String snapDbKey = iterator.next().getKey();
-        SnapshotProperty snapshotSize = SnapshotProperty.newBuilder()
-            .setSnapshotKey(snapDbKey)
+        SnapshotSize snapshotSize = SnapshotSize.newBuilder()
             .setExclusiveSize(exclusiveSize)
             .setExclusiveReplicatedSize(exclusiveSizeAfterRepl)
             .build();
         SetSnapshotPropertyRequest snapshotUpdateSizeRequest =
             SetSnapshotPropertyRequest.newBuilder()
-                .setSnapshotProperty(snapshotSize)
+                .setSnapshotKey(snapDbKey)
+                .setSnapshotSize(snapshotSize)
                 .build();
 
         OMRequest omRequest = OMRequest.newBuilder()
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
index df27bae1ba..835ec65272 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
@@ -121,7 +121,7 @@ public class TestKeyDeletingService {
     }
     System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
     ServerUtils.setOzoneMetaDirPath(conf, newFolder.toString());
-    conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
+    conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 1000,
         TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL,
         100, TimeUnit.MILLISECONDS);
@@ -207,7 +207,7 @@ public class TestKeyDeletingService {
     // Make sure that we have run the background thread 5 times more
     GenericTestUtils.waitFor(
         () -> keyDeletingService.getRunCount().get() >= 5,
-        100, 1000);
+        100, 10000);
     // Since SCM calls are failing, deletedKeyCount should be zero.
     assertEquals(0, keyDeletingService.getDeletedKeyCount().get());
     assertEquals(keyCount, keyManager
@@ -544,7 +544,7 @@ t
     // Create Snap3, traps all the deleted keys.
     writeClient.createSnapshot(volumeName, bucketName, "snap3");
     assertTableRowCount(snapshotInfoTable, 3, metadataManager);
-    checkSnapDeepCleanStatus(snapshotInfoTable, true);
+    checkSnapDeepCleanStatus(snapshotInfoTable, false);
 
     keyDeletingService.resume();
 
@@ -564,9 +564,8 @@ t
 
       assertTableRowCount(snap3deletedTable, 0, metadataManager);
       assertTableRowCount(deletedTable, 0, metadataManager);
-      checkSnapDeepCleanStatus(snapshotInfoTable, false);
+      checkSnapDeepCleanStatus(snapshotInfoTable, true);
     }
-
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to