This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bf7135925 Revert "HDDS-9426. Calculate Exclusive size for deep 
cleaned snapshot's deleted directories. (#5579)" (#6051)
2bf7135925 is described below

commit 2bf713592575796a2e4a07f3cbdd800fcca9f776
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Mon Jan 22 19:01:22 2024 +0100

    Revert "HDDS-9426. Calculate Exclusive size for deep cleaned snapshot's 
deleted directories. (#5579)" (#6051)
    
    Reason for revert: incompatible proto changes
    
    This reverts commit 05284942fa5ec8026de0618f61af9b7cd758a90b.
    This reverts commit d969689e83241e2d5bec2c878324ee1ce84c0fe4.
---
 .../common/src/main/resources/ozone-default.xml    |  19 -
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |  12 -
 .../hadoop/ozone/om/helpers/SnapshotInfo.java      |  44 +-
 .../ozone/om/helpers/TestOmSnapshotInfo.java       |  11 +-
 .../om/TestSnapshotDirectoryCleaningService.java   | 272 -----------
 .../src/main/proto/OmClientProtocol.proto          |  10 +-
 .../org/apache/hadoop/ozone/om/KeyManager.java     |   7 -
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  30 --
 .../request/snapshot/OMSnapshotPurgeRequest.java   |  21 +-
 .../snapshot/OMSnapshotSetPropertyRequest.java     |  35 +-
 .../om/service/AbstractKeyDeletingService.java     | 100 ----
 .../ozone/om/service/KeyDeletingService.java       | 151 ++++--
 .../service/SnapshotDirectoryCleaningService.java  | 515 ---------------------
 ...estOMSnapshotSetPropertyRequestAndResponse.java |   8 +-
 .../ozone/om/service/TestKeyDeletingService.java   |  10 +-
 15 files changed, 158 insertions(+), 1087 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 6a29dc81ec..7debcd479e 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3600,25 +3600,6 @@
     </description>
   </property>
 
-  <property>
-    <name>ozone.snapshot.directory.service.timeout</name>
-    <value>300s</value>
-    <tag>OZONE, PERFORMANCE, OM</tag>
-    <description>
-      Timeout value for SnapshotDirectoryCleaningService.
-    </description>
-  </property>
-
-  <property>
-    <name>ozone.snapshot.directory.service.interval</name>
-    <value>24h</value>
-    <tag>OZONE, PERFORMANCE, OM</tag>
-    <description>
-      The time interval between successive SnapshotDirectoryCleaningService
-      thread run.
-    </description>
-  </property>
-
   <property>
     <name>ozone.scm.event.ContainerReport.thread.pool.size</name>
     <value>10</value>
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 5dd7579eb9..58f341b74a 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -390,18 +390,6 @@ public final class OMConfigKeys {
   public static final String OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT
       = "60s";
 
-  /**
-   * Configuration properties for Snapshot Directory Service.
-   */
-  public static final String OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL =
-      "ozone.snapshot.directory.service.interval";
-  public static final String OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL_DEFAULT
-      = "24h";
-  public static final String OZONE_SNAPSHOT_DIRECTORY_SERVICE_TIMEOUT =
-      "ozone.snapshot.directory.service.timeout";
-  public static final String
-      OZONE_SNAPSHOT_DIRECTORY_SERVICE_TIMEOUT_DEFAULT = "300s";
-
   public static final String OZONE_PATH_DELETING_LIMIT_PER_TASK =
       "ozone.path.deleting.limit.per.task";
   // default is 6000 taking account of 32MB buffer size, and assuming
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
index 56103ccb3a..8ee9c6ee1f 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
@@ -123,7 +123,6 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
   private long referencedReplicatedSize;
   private long exclusiveSize;
   private long exclusiveReplicatedSize;
-  private boolean deepCleanedDeletedDir;
 
   /**
    * Private constructor, constructed via builder.
@@ -163,8 +162,7 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
                        long referencedSize,
                        long referencedReplicatedSize,
                        long exclusiveSize,
-                       long exclusiveReplicatedSize,
-                       boolean deepCleanedDeletedDir) {
+                       long exclusiveReplicatedSize) {
     this.snapshotId = snapshotId;
     this.name = name;
     this.volumeName = volumeName;
@@ -183,7 +181,6 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
     this.referencedReplicatedSize = referencedReplicatedSize;
     this.exclusiveSize = exclusiveSize;
     this.exclusiveReplicatedSize = exclusiveReplicatedSize;
-    this.deepCleanedDeletedDir = deepCleanedDeletedDir;
   }
 
   public void setName(String name) {
@@ -288,7 +285,7 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
   }
 
   public SnapshotInfo.Builder toBuilder() {
-    return new Builder()
+    return new SnapshotInfo.Builder()
         .setSnapshotId(snapshotId)
         .setName(name)
         .setVolumeName(volumeName)
@@ -305,8 +302,7 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
         .setReferencedSize(referencedSize)
         .setReferencedReplicatedSize(referencedReplicatedSize)
         .setExclusiveSize(exclusiveSize)
-        .setExclusiveReplicatedSize(exclusiveReplicatedSize)
-        .setDeepCleanedDeletedDir(deepCleanedDeletedDir);
+        .setExclusiveReplicatedSize(exclusiveReplicatedSize);
   }
 
   /**
@@ -331,7 +327,6 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
     private long referencedReplicatedSize;
     private long exclusiveSize;
     private long exclusiveReplicatedSize;
-    private boolean deepCleanedDeletedDir;
 
     public Builder() {
       // default values
@@ -428,11 +423,6 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
       return this;
     }
 
-    public Builder setDeepCleanedDeletedDir(boolean deepCleanedDeletedDir) {
-      this.deepCleanedDeletedDir = deepCleanedDeletedDir;
-      return this;
-    }
-
     public SnapshotInfo build() {
       Preconditions.checkNotNull(name);
       return new SnapshotInfo(
@@ -453,8 +443,7 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
           referencedSize,
           referencedReplicatedSize,
           exclusiveSize,
-          exclusiveReplicatedSize,
-          deepCleanedDeletedDir
+          exclusiveReplicatedSize
       );
     }
   }
@@ -476,8 +465,7 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
             .setReferencedSize(referencedSize)
             .setReferencedReplicatedSize(referencedReplicatedSize)
             .setExclusiveSize(exclusiveSize)
-            .setExclusiveReplicatedSize(exclusiveReplicatedSize)
-            .setDeepCleanedDeletedDir(deepCleanedDeletedDir);
+            .setExclusiveReplicatedSize(exclusiveReplicatedSize);
 
     if (pathPreviousSnapshotId != null) {
       sib.setPathPreviousSnapshotID(toProtobuf(pathPreviousSnapshotId));
@@ -550,11 +538,6 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
           snapshotInfoProto.getExclusiveReplicatedSize());
     }
 
-    if (snapshotInfoProto.hasDeepCleanedDeletedDir()) {
-      osib.setDeepCleanedDeletedDir(
-          snapshotInfoProto.getDeepCleanedDeletedDir());
-    }
-
     osib.setSnapshotPath(snapshotInfoProto.getSnapshotPath())
         .setCheckpointDir(snapshotInfoProto.getCheckpointDir())
         .setDbTxSequenceNumber(snapshotInfoProto.getDbTxSequenceNumber());
@@ -639,14 +622,6 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
     return exclusiveReplicatedSize;
   }
 
-  public boolean getDeepCleanedDeletedDir() {
-    return deepCleanedDeletedDir;
-  }
-
-  public void setDeepCleanedDeletedDir(boolean deepCleanedDeletedDir) {
-    this.deepCleanedDeletedDir = deepCleanedDeletedDir;
-  }
-
   /**
    * Generate default name of snapshot, (used if user doesn't provide one).
    */
@@ -680,8 +655,7 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
         .setSnapshotPath(volumeName + OM_KEY_PREFIX + bucketName)
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
-        .setDeepClean(false)
-        .setDeepCleanedDeletedDir(false);
+        .setDeepClean(true);
 
     if (snapshotId != null) {
       builder.setCheckpointDir(getCheckpointDirName(snapshotId));
@@ -714,8 +688,7 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
         referencedSize == that.referencedSize &&
         referencedReplicatedSize == that.referencedReplicatedSize &&
         exclusiveSize == that.exclusiveSize &&
-        exclusiveReplicatedSize == that.exclusiveReplicatedSize &&
-        deepCleanedDeletedDir == that.deepCleanedDeletedDir;
+        exclusiveReplicatedSize == that.exclusiveReplicatedSize;
   }
 
   @Override
@@ -726,7 +699,7 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
         globalPreviousSnapshotId, snapshotPath, checkpointDir,
         deepClean, sstFiltered,
         referencedSize, referencedReplicatedSize,
-        exclusiveSize, exclusiveReplicatedSize, deepCleanedDeletedDir);
+        exclusiveSize, exclusiveReplicatedSize);
   }
 
   /**
@@ -753,7 +726,6 @@ public final class SnapshotInfo implements Auditable, 
CopyObject<SnapshotInfo> {
         .setReferencedReplicatedSize(referencedReplicatedSize)
         .setExclusiveSize(exclusiveSize)
         .setExclusiveReplicatedSize(exclusiveReplicatedSize)
-        .setDeepCleanedDeletedDir(deepCleanedDeletedDir)
         .build();
   }
 }
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotInfo.java
 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotInfo.java
index dd9cf34c8e..6dc3f913d0 100644
--- 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotInfo.java
+++ 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotInfo.java
@@ -66,13 +66,12 @@ public class TestOmSnapshotInfo {
         .setSnapshotPath(SNAPSHOT_PATH)
         .setCheckpointDir(CHECKPOINT_DIR)
         .setDbTxSequenceNumber(DB_TX_SEQUENCE_NUMBER)
-        .setDeepClean(false)
+        .setDeepClean(true)
         .setSstFiltered(false)
         .setReferencedSize(2000L)
         .setReferencedReplicatedSize(6000L)
         .setExclusiveSize(1000L)
         .setExclusiveReplicatedSize(3000L)
-        .setDeepCleanedDeletedDir(false)
         .build();
   }
 
@@ -90,13 +89,12 @@ public class TestOmSnapshotInfo {
         .setSnapshotPath(SNAPSHOT_PATH)
         .setCheckpointDir(CHECKPOINT_DIR)
         .setDbTxSequenceNumber(DB_TX_SEQUENCE_NUMBER)
-        .setDeepClean(false)
+        .setDeepClean(true)
         .setSstFiltered(false)
         .setReferencedSize(2000L)
         .setReferencedReplicatedSize(6000L)
         .setExclusiveSize(1000L)
         .setExclusiveReplicatedSize(3000L)
-        .setDeepCleanedDeletedDir(false)
         .build();
   }
 
@@ -142,9 +140,6 @@ public class TestOmSnapshotInfo {
     assertEquals(
         snapshotInfoEntryExpected.getExclusiveReplicatedSize(),
         snapshotInfoEntryActual.getExclusiveReplicatedSize());
-    assertEquals(
-        snapshotInfoEntryExpected.getDeepCleanedDeletedDir(),
-        snapshotInfoEntryActual.getDeepCleanedDeletedDir());
 
     assertEquals(snapshotInfoEntryExpected, snapshotInfoEntryActual);
   }
@@ -181,8 +176,6 @@ public class TestOmSnapshotInfo {
         snapshotInfoActual.getExclusiveSize());
     assertEquals(snapshotInfoExpected.getExclusiveReplicatedSize(),
         snapshotInfoActual.getExclusiveReplicatedSize());
-    assertEquals(snapshotInfoExpected.getDeepCleanedDeletedDir(),
-        snapshotInfoActual.getDeepCleanedDeletedDir());
 
     assertEquals(snapshotInfoExpected, snapshotInfoActual);
   }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotDirectoryCleaningService.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotDirectoryCleaningService.java
deleted file mode 100644
index 6b39b76c54..0000000000
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotDirectoryCleaningService.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.om;
-
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.utils.IOUtils;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.TestDataUtil;
-import org.apache.hadoop.ozone.client.OzoneBucket;
-import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.om.helpers.BucketLayout;
-import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
-import org.apache.hadoop.ozone.om.service.SnapshotDirectoryCleaningService;
-import org.apache.ozone.test.GenericTestUtils;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
-import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
-import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.fail;
-
-/**
- * Test Snapshot Directory Service.
- */
-@Timeout(300)
-public class TestSnapshotDirectoryCleaningService {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TestSnapshotDirectoryCleaningService.class);
-
-  private static MiniOzoneCluster cluster;
-  private static FileSystem fs;
-  private static String volumeName;
-  private static String bucketName;
-  private static OzoneClient client;
-
-  @BeforeAll
-  public static void init() throws Exception {
-    OzoneConfiguration conf = new OzoneConfiguration();
-    conf.setInt(OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL, 2500);
-    conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 2500,
-        TimeUnit.MILLISECONDS);
-    conf.setBoolean(OZONE_ACL_ENABLED, true);
-    cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(3)
-        .build();
-    cluster.waitForClusterToBeReady();
-    client = cluster.newClient();
-
-    // create a volume and a bucket to be used by OzoneFileSystem
-    OzoneBucket bucket = TestDataUtil.createVolumeAndBucket(client,
-        BucketLayout.FILE_SYSTEM_OPTIMIZED);
-    volumeName = bucket.getVolumeName();
-    bucketName = bucket.getName();
-
-    String rootPath = String.format("%s://%s.%s/",
-        OzoneConsts.OZONE_URI_SCHEME, bucketName, volumeName);
-
-    // Set the fs.defaultFS and start the filesystem
-    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
-    // Set the number of keys to be processed during batch operate.
-    conf.setInt(OZONE_FS_ITERATE_BATCH_SIZE, 5);
-
-    fs = FileSystem.get(conf);
-  }
-
-  @AfterAll
-  public static void teardown() {
-    IOUtils.closeQuietly(client);
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-    IOUtils.closeQuietly(fs);
-  }
-
-  @AfterEach
-  public void cleanup() {
-    try {
-      Path root = new Path("/");
-      FileStatus[] fileStatuses = fs.listStatus(root);
-      for (FileStatus fileStatus : fileStatuses) {
-        fs.delete(fileStatus.getPath(), true);
-      }
-    } catch (IOException ex) {
-      fail("Failed to cleanup files.");
-    }
-  }
-
-  @SuppressWarnings("checkstyle:LineLength")
-  @Test
-  public void testExclusiveSizeWithDirectoryDeepClean() throws Exception {
-
-    Table<String, OmKeyInfo> deletedDirTable =
-        cluster.getOzoneManager().getMetadataManager().getDeletedDirTable();
-    Table<String, OmKeyInfo> keyTable =
-        cluster.getOzoneManager().getMetadataManager()
-            .getKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED);
-    Table<String, OmDirectoryInfo> dirTable =
-        cluster.getOzoneManager().getMetadataManager().getDirectoryTable();
-    Table<String, RepeatedOmKeyInfo> deletedKeyTable =
-        cluster.getOzoneManager().getMetadataManager().getDeletedTable();
-    Table<String, SnapshotInfo> snapshotInfoTable =
-        cluster.getOzoneManager().getMetadataManager().getSnapshotInfoTable();
-    SnapshotDirectoryCleaningService snapshotDirectoryCleaningService =
-        
cluster.getOzoneManager().getKeyManager().getSnapshotDirectoryService();
-
-    /*    DirTable
-    /v/b/snapDir
-    /v/b/snapDir/appRoot0-2/
-    /v/b/snapDir/appRoot0-2/parentDir0-2/
-          FileTable
-    /v/b/snapDir/testKey0 - testKey4  = 5 keys
-    /v/b/snapDir/appRoot0-2/parentDir0-2/childFile = 9 keys
-    /v/b/snapDir/appRoot0/parentDir0-2/childFile0-4 = 15 keys
-     */
-
-    Path root = new Path("/snapDir");
-    // Create parent dir from root.
-    fs.mkdirs(root);
-
-    // Add 5 files inside root dir
-    // Creates /v/b/snapDir/testKey0 - testKey4
-    for (int i = 0; i < 5; i++) {
-      Path path = new Path(root, "testKey" + i);
-      try (FSDataOutputStream stream = fs.create(path)) {
-        stream.write(1);
-      }
-    }
-
-    // Creates /v/b/snapDir/appRoot0-2/parentDir0-2/childFile
-    for (int i = 0; i < 3; i++) {
-      for (int j = 0; j < 3; j++) {
-        Path appRoot = new Path(root, "appRoot" + j);
-        Path parent = new Path(appRoot, "parentDir" + i);
-        Path child = new Path(parent, "childFile");
-        try (FSDataOutputStream stream = fs.create(child)) {
-          stream.write(1);
-        }
-      }
-    }
-
-    assertTableRowCount(keyTable, 14);
-    assertTableRowCount(dirTable, 13);
-    // Create snapshot
-    client.getObjectStore().createSnapshot(volumeName, bucketName, "snap1");
-
-    // Creates /v/b/snapDir/appRoot0/parentDir0-2/childFile0-4
-    for (int i = 0; i < 3; i++) {
-      Path appRoot = new Path(root, "appRoot0");
-      Path parent = new Path(appRoot, "parentDir" + i);
-      for (int j = 0; j < 5; j++) {
-        Path child = new Path(parent, "childFile" + j);
-        try (FSDataOutputStream stream = fs.create(child)) {
-          stream.write(1);
-        }
-      }
-    }
-
-    for (int i = 5; i < 10; i++) {
-      Path path = new Path(root, "testKey" + i);
-      try (FSDataOutputStream stream = fs.create(path)) {
-        stream.write(1);
-      }
-    }
-
-    assertTableRowCount(deletedDirTable, 0);
-    assertTableRowCount(keyTable, 34);
-    assertTableRowCount(dirTable, 13);
-    Path appRoot0 = new Path(root, "appRoot0");
-    // Only parentDir0-2/childFile under appRoot0 is exclusive for snap1
-    fs.delete(appRoot0, true);
-    assertTableRowCount(deletedDirTable, 1);
-    client.getObjectStore().createSnapshot(volumeName, bucketName, "snap2");
-
-    // Delete testKey0-9
-    for (int i = 0; i < 10; i++) {
-      Path testKey = new Path(root, "testKey" + i);
-      fs.delete(testKey, false);
-    }
-
-    fs.delete(root, true);
-    assertTableRowCount(deletedKeyTable, 10);
-    client.getObjectStore().createSnapshot(volumeName, bucketName, "snap3");
-    long prevRunCount = snapshotDirectoryCleaningService.getRunCount().get();
-    GenericTestUtils.waitFor(() -> 
snapshotDirectoryCleaningService.getRunCount().get()
-        > prevRunCount + 1, 100, 10000);
-
-    Thread.sleep(2000);
-    Map<String, Long> expectedSize = new HashMap<String, Long>() {{
-      // /v/b/snapDir/appRoot0/parentDir0-2/childFile contribute
-      // exclusive size, /v/b/snapDir/appRoot0/parentDir0-2/childFile0-4
-      // are deep cleaned and hence don't contribute to size.
-        put("snap1", 3L);
-      // Only testKey5-9 contribute to the exclusive size
-        put("snap2", 5L);
-        put("snap3", 0L);
-      }};
-    Thread.sleep(500);
-    try (TableIterator<String, ? extends Table.KeyValue<String, SnapshotInfo>>
-        iterator = snapshotInfoTable.iterator()) {
-      while (iterator.hasNext()) {
-        Table.KeyValue<String, SnapshotInfo> snapshotEntry = iterator.next();
-        String snapshotName = snapshotEntry.getValue().getName();
-        assertEquals(expectedSize.get(snapshotName), snapshotEntry.getValue().
-            getExclusiveSize());
-        // Since for the test we are using RATIS/THREE
-        assertEquals(expectedSize.get(snapshotName) * 3,
-            snapshotEntry.getValue().getExclusiveReplicatedSize());
-
-      }
-    }
-  }
-
-  private void assertTableRowCount(Table<String, ?> table, int count)
-      throws TimeoutException, InterruptedException {
-    GenericTestUtils.waitFor(() -> assertTableRowCount(count, table), 1000,
-        120000); // 2 minutes
-  }
-
-  private boolean assertTableRowCount(int expectedCount,
-                                      Table<String, ?> table) {
-    long count = 0L;
-    try {
-      count = cluster.getOzoneManager().getMetadataManager()
-          .countRowsInTable(table);
-      LOG.info("{} actual row count={}, expectedCount={}", table.getName(),
-          count, expectedCount);
-    } catch (IOException ex) {
-      fail("testDoubleBuffer failed with: " + ex);
-    }
-    return count == expectedCount;
-  }
-}
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index e49f23b115..fd83981507 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -851,7 +851,6 @@ message SnapshotInfo {
   // snapshot exclusive size after replication
   optional uint64 exclusiveReplicatedSize = 18;
   // note: shared sizes can be calculated from: referenced - exclusive
-  optional bool deepCleanedDeletedDir = 19;
 }
 
 message SnapshotDiffJobProto {
@@ -1892,16 +1891,15 @@ message SnapshotMoveKeyInfos {
 
 message SnapshotPurgeRequest {
   repeated string snapshotDBKeys = 1;
+  repeated string updatedSnapshotDBKey = 2;
 }
 
 message SetSnapshotPropertyRequest {
-  optional string snapshotKey = 1;
-  optional SnapshotSize snapshotSize = 2;
-  optional bool deepCleanedDeletedDir = 3;
-  optional bool deepCleanedDeletedKey = 4;
+  optional SnapshotProperty snapshotProperty = 1;
 }
 
-message SnapshotSize {
+message SnapshotProperty {
+  optional string snapshotKey = 1;
   optional uint64 exclusiveSize = 2;
   optional uint64 exclusiveReplicatedSize = 3;
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index 4378701426..0fe1cdbe80 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.service.KeyDeletingService;
 import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
-import org.apache.hadoop.ozone.om.service.SnapshotDirectoryCleaningService;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
 
 import java.io.IOException;
@@ -286,10 +285,4 @@ public interface KeyManager extends OzoneManagerFS, 
IOzoneAcl {
    * @return Background service.
    */
   SnapshotDeletingService getSnapshotDeletingService();
-
-  /**
-   * Returns the instance of Snapshot Directory service.
-   * @return Background service.
-   */
-  SnapshotDirectoryCleaningService getSnapshotDirectoryService();
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 9cfd404314..cac2aa53f6 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -86,7 +86,6 @@ import org.apache.hadoop.ozone.om.service.KeyDeletingService;
 import org.apache.hadoop.ozone.om.service.MultipartUploadCleanupService;
 import org.apache.hadoop.ozone.om.service.OpenKeyCleanupService;
 import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
-import org.apache.hadoop.ozone.om.service.SnapshotDirectoryCleaningService;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
@@ -132,10 +131,6 @@ import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT_DEFAULT;
-import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL;
-import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL_DEFAULT;
-import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_TIMEOUT;
-import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_TIMEOUT_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT;
 import static org.apache.hadoop.ozone.om.OzoneManagerUtils.getBucketLayout;
@@ -186,7 +181,6 @@ public class KeyManagerImpl implements KeyManager {
 
   private BackgroundService openKeyCleanupService;
   private BackgroundService multipartUploadCleanupService;
-  private SnapshotDirectoryCleaningService snapshotDirectoryCleaningService;
 
   public KeyManagerImpl(OzoneManager om, ScmClient scmClient,
       OzoneConfiguration conf, OMPerformanceMetrics metrics) {
@@ -306,22 +300,6 @@ public class KeyManagerImpl implements KeyManager {
       }
     }
 
-    if (snapshotDirectoryCleaningService == null &&
-        ozoneManager.isFilesystemSnapshotEnabled()) {
-      long dirDeleteInterval = configuration.getTimeDuration(
-          OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL,
-          OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL_DEFAULT,
-          TimeUnit.MILLISECONDS);
-      long serviceTimeout = configuration.getTimeDuration(
-          OZONE_SNAPSHOT_DIRECTORY_SERVICE_TIMEOUT,
-          OZONE_SNAPSHOT_DIRECTORY_SERVICE_TIMEOUT_DEFAULT,
-          TimeUnit.MILLISECONDS);
-      snapshotDirectoryCleaningService = new SnapshotDirectoryCleaningService(
-          dirDeleteInterval, TimeUnit.MILLISECONDS, serviceTimeout,
-          ozoneManager, scmClient.getBlockClient());
-      snapshotDirectoryCleaningService.start();
-    }
-
     if (multipartUploadCleanupService == null) {
       long serviceInterval = configuration.getTimeDuration(
           OZONE_OM_MPU_CLEANUP_SERVICE_INTERVAL,
@@ -368,10 +346,6 @@ public class KeyManagerImpl implements KeyManager {
       multipartUploadCleanupService.shutdown();
       multipartUploadCleanupService = null;
     }
-    if (snapshotDirectoryCleaningService != null) {
-      snapshotDirectoryCleaningService.shutdown();
-      snapshotDirectoryCleaningService = null;
-    }
   }
 
   private OmBucketInfo getBucketInfo(String volumeName, String bucketName)
@@ -707,10 +681,6 @@ public class KeyManagerImpl implements KeyManager {
     return snapshotDeletingService;
   }
 
-  public SnapshotDirectoryCleaningService getSnapshotDirectoryService() {
-    return snapshotDirectoryCleaningService;
-  }
-
   public boolean isSstFilteringSvcEnabled() {
     long serviceInterval = ozoneManager.getConfiguration()
         .getTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java
index b7dba82602..09711c7045 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java
@@ -72,10 +72,22 @@ public class OMSnapshotPurgeRequest extends OMClientRequest 
{
     try {
       List<String> snapshotDbKeys = snapshotPurgeRequest
           .getSnapshotDBKeysList();
+      List<String> snapInfosToUpdate = snapshotPurgeRequest
+          .getUpdatedSnapshotDBKeyList();
       Map<String, SnapshotInfo> updatedSnapInfos = new HashMap<>();
       Map<String, SnapshotInfo> updatedPathPreviousAndGlobalSnapshots =
           new HashMap<>();
 
+      // Snapshots that are already deepCleaned by the KeyDeletingService
+      // can be marked as deepCleaned.
+      for (String snapTableKey : snapInfosToUpdate) {
+        SnapshotInfo snapInfo = omMetadataManager.getSnapshotInfoTable()
+            .get(snapTableKey);
+
+        updateSnapshotInfoAndCache(snapInfo, omMetadataManager,
+            trxnLogIndex, updatedSnapInfos, false);
+      }
+
       // Snapshots that are purged by the SnapshotDeletingService
       // will update the next snapshot so that is can be deep cleaned
       // by the KeyDeletingService in the next run.
@@ -88,7 +100,7 @@ public class OMSnapshotPurgeRequest extends OMClientRequest {
                 snapshotChainManager, omSnapshotManager);
 
         updateSnapshotInfoAndCache(nextSnapshot, omMetadataManager,
-            trxnLogIndex, updatedSnapInfos);
+            trxnLogIndex, updatedSnapInfos, true);
         updateSnapshotChainAndCache(omMetadataManager, fromSnapshot,
             trxnLogIndex, updatedPathPreviousAndGlobalSnapshots);
         ozoneManager.getOmSnapshotManager().getSnapshotCache()
@@ -108,12 +120,9 @@ public class OMSnapshotPurgeRequest extends 
OMClientRequest {
 
   private void updateSnapshotInfoAndCache(SnapshotInfo snapInfo,
       OmMetadataManagerImpl omMetadataManager, long trxnLogIndex,
-      Map<String, SnapshotInfo> updatedSnapInfos) {
+      Map<String, SnapshotInfo> updatedSnapInfos, boolean deepClean) {
     if (snapInfo != null) {
-      // Setting next snapshot deep clean to false, Since the
-      // current snapshot is deleted. We can potentially
-      // reclaim more keys in the next snapshot.
-      snapInfo.setDeepClean(false);
+      snapInfo.setDeepClean(deepClean);
 
       // Update table cache first
       omMetadataManager.getSnapshotInfoTable().addCacheEntry(
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotSetPropertyRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotSetPropertyRequest.java
index b3dd5206c9..966a265af8 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotSetPropertyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotSetPropertyRequest.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import 
org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotSetPropertyResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotSize;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotProperty;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,10 +60,16 @@ public class OMSnapshotSetPropertyRequest extends 
OMClientRequest {
     OzoneManagerProtocolProtos.SetSnapshotPropertyRequest
         setSnapshotPropertyRequest = getOmRequest()
         .getSetSnapshotPropertyRequest();
+
+    SnapshotProperty snapshotProperty = setSnapshotPropertyRequest
+        .getSnapshotProperty();
     SnapshotInfo updatedSnapInfo = null;
 
     try {
-      String snapshotKey = setSnapshotPropertyRequest.getSnapshotKey();
+      String snapshotKey = snapshotProperty.getSnapshotKey();
+      long exclusiveSize = snapshotProperty.getExclusiveSize();
+      long exclusiveReplicatedSize = snapshotProperty
+          .getExclusiveReplicatedSize();
       updatedSnapInfo = metadataManager.getSnapshotInfoTable()
           .get(snapshotKey);
 
@@ -73,28 +79,9 @@ public class OMSnapshotSetPropertyRequest extends 
OMClientRequest {
             " is not found", INVALID_SNAPSHOT_ERROR);
       }
 
-      if (setSnapshotPropertyRequest.hasDeepCleanedDeletedDir()) {
-        updatedSnapInfo.setDeepCleanedDeletedDir(setSnapshotPropertyRequest
-            .getDeepCleanedDeletedDir());
-      }
-
-      if (setSnapshotPropertyRequest.hasDeepCleanedDeletedKey()) {
-        updatedSnapInfo.setDeepClean(setSnapshotPropertyRequest
-            .getDeepCleanedDeletedKey());
-      }
-
-      if (setSnapshotPropertyRequest.hasSnapshotSize()) {
-        SnapshotSize snapshotSize = setSnapshotPropertyRequest
-            .getSnapshotSize();
-        long exclusiveSize = updatedSnapInfo.getExclusiveSize() +
-            snapshotSize.getExclusiveSize();
-        long exclusiveReplicatedSize = updatedSnapInfo
-            .getExclusiveReplicatedSize() + snapshotSize
-            .getExclusiveReplicatedSize();
-        // Set Exclusive size.
-        updatedSnapInfo.setExclusiveSize(exclusiveSize);
-        updatedSnapInfo.setExclusiveReplicatedSize(exclusiveReplicatedSize);
-      }
+      // Set Exclusive size.
+      updatedSnapInfo.setExclusiveSize(exclusiveSize);
+      updatedSnapInfo.setExclusiveReplicatedSize(exclusiveReplicatedSize);
       // Update Table Cache
       metadataManager.getSnapshotInfoTable().addCacheEntry(
           new CacheKey<>(snapshotKey),
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
index 21ad087276..1091053ebd 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
@@ -469,106 +469,6 @@ public abstract class AbstractKeyDeletingService extends 
BackgroundService
     return remainNum;
   }
 
-  /**
-   * To calculate Exclusive Size for current snapshot, Check
-   * the next snapshot deletedTable if the deleted key is
-   * referenced in current snapshot and not referenced in the
-   * previous snapshot then that key is exclusive to the current
-   * snapshot. Here since we are only iterating through
-   * deletedTable we can check the previous and previous to
-   * previous snapshot to achieve the same.
-   * previousSnapshot - Snapshot for which exclusive size is
-   *                    getting calculating.
-   * currSnapshot - Snapshot's deletedTable is used to calculate
-   *                previousSnapshot snapshot's exclusive size.
-   * previousToPrevSnapshot - Snapshot which is used to check
-   *                 if key is exclusive to previousSnapshot.
-   */
-  @SuppressWarnings("checkstyle:ParameterNumber")
-  public void calculateExclusiveSize(
-      SnapshotInfo previousSnapshot,
-      SnapshotInfo previousToPrevSnapshot,
-      OmKeyInfo keyInfo,
-      OmBucketInfo bucketInfo, long volumeId,
-      Table<String, String> snapRenamedTable,
-      Table<String, OmKeyInfo> previousKeyTable,
-      Table<String, String> prevRenamedTable,
-      Table<String, OmKeyInfo> previousToPrevKeyTable,
-      Map<String, Long> exclusiveSizeMap,
-      Map<String, Long> exclusiveReplicatedSizeMap) throws IOException {
-    String prevSnapKey = previousSnapshot.getTableKey();
-    long exclusiveReplicatedSize =
-        exclusiveReplicatedSizeMap.getOrDefault(
-            prevSnapKey, 0L) + keyInfo.getReplicatedSize();
-    long exclusiveSize = exclusiveSizeMap.getOrDefault(
-        prevSnapKey, 0L) + keyInfo.getDataSize();
-
-    // If there is no previous to previous snapshot, then
-    // the previous snapshot is the first snapshot.
-    if (previousToPrevSnapshot == null) {
-      exclusiveSizeMap.put(prevSnapKey, exclusiveSize);
-      exclusiveReplicatedSizeMap.put(prevSnapKey,
-          exclusiveReplicatedSize);
-    } else {
-      OmKeyInfo keyInfoPrevSnapshot = getPreviousSnapshotKeyName(
-          keyInfo, bucketInfo, volumeId,
-          snapRenamedTable, previousKeyTable);
-      OmKeyInfo keyInfoPrevToPrevSnapshot = getPreviousSnapshotKeyName(
-          keyInfoPrevSnapshot, bucketInfo, volumeId,
-          prevRenamedTable, previousToPrevKeyTable);
-      // If the previous to previous snapshot doesn't
-      // have the key, then it is exclusive size for the
-      // previous snapshot.
-      if (keyInfoPrevToPrevSnapshot == null) {
-        exclusiveSizeMap.put(prevSnapKey, exclusiveSize);
-        exclusiveReplicatedSizeMap.put(prevSnapKey,
-            exclusiveReplicatedSize);
-      }
-    }
-  }
-
-  private OmKeyInfo getPreviousSnapshotKeyName(
-      OmKeyInfo keyInfo, OmBucketInfo bucketInfo, long volumeId,
-      Table<String, String> snapRenamedTable,
-      Table<String, OmKeyInfo> previousKeyTable) throws IOException {
-
-    if (keyInfo == null) {
-      return null;
-    }
-
-    String dbKeyPrevSnap;
-    if (bucketInfo.getBucketLayout().isFileSystemOptimized()) {
-      dbKeyPrevSnap = getOzoneManager().getMetadataManager().getOzonePathKey(
-          volumeId,
-          bucketInfo.getObjectID(),
-          keyInfo.getParentObjectID(),
-          keyInfo.getFileName());
-    } else {
-      dbKeyPrevSnap = getOzoneManager().getMetadataManager().getOzoneKey(
-          keyInfo.getVolumeName(),
-          keyInfo.getBucketName(),
-          keyInfo.getKeyName());
-    }
-
-    String dbRenameKey = getOzoneManager().getMetadataManager().getRenameKey(
-        keyInfo.getVolumeName(),
-        keyInfo.getBucketName(),
-        keyInfo.getObjectID());
-
-    String renamedKey = snapRenamedTable.getIfExist(dbRenameKey);
-    OmKeyInfo prevKeyInfo = renamedKey != null ?
-        previousKeyTable.get(renamedKey) :
-        previousKeyTable.get(dbKeyPrevSnap);
-
-    if (prevKeyInfo == null ||
-        prevKeyInfo.getObjectID() != keyInfo.getObjectID()) {
-      return null;
-    }
-
-    return isBlockLocationInfoSame(prevKeyInfo, keyInfo) ?
-        prevKeyInfo : null;
-  }
-
   protected boolean isBufferLimitCrossed(
       int maxLimit, int cLimit, int increment) {
     return cLimit + increment >= maxLimit;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
index e89608e82d..6dcc2544b4 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
@@ -48,7 +48,8 @@ import 
org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
 import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotSize;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotPurgeRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotProperty;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetSnapshotPropertyRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.hadoop.hdds.utils.BackgroundTask;
@@ -97,7 +98,6 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
   private final Map<String, Long> exclusiveSizeMap;
   private final Map<String, Long> exclusiveReplicatedSizeMap;
   private final Set<String> completedExclusiveSizeSet;
-  private final Map<String, String> snapshotSeekMap;
 
   public KeyDeletingService(OzoneManager ozoneManager,
       ScmBlockLocationProtocol scmClient,
@@ -116,7 +116,6 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
     this.exclusiveSizeMap = new HashMap<>();
     this.exclusiveReplicatedSizeMap = new HashMap<>();
     this.completedExclusiveSizeSet = new HashSet<>();
-    this.snapshotSeekMap = new HashMap<>();
   }
 
   /**
@@ -259,8 +258,8 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
 
           // Deep clean only on active snapshot. Deleted Snapshots will be
           // cleaned up by SnapshotDeletingService.
-          if (currSnapInfo.getSnapshotStatus() != SNAPSHOT_ACTIVE ||
-              currSnapInfo.getDeepClean()) {
+          if (!currSnapInfo.getSnapshotStatus().equals(SNAPSHOT_ACTIVE) ||
+              !currSnapInfo.getDeepClean()) {
             continue;
           }
 
@@ -343,22 +342,11 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
                 RepeatedOmKeyInfo>> deletedIterator = snapDeletedTable
                 .iterator()) {
 
-              String lastKeyInCurrentRun = null;
-              String deletedTableSeek = snapshotSeekMap.getOrDefault(
-                  currSnapInfo.getTableKey(), snapshotBucketKey);
-              deletedIterator.seek(deletedTableSeek);
-              // To avoid processing the last key from the previous
-              // run again.
-              if (!deletedTableSeek.equals(snapshotBucketKey) &&
-                  deletedIterator.hasNext()) {
-                deletedIterator.next();
-              }
-
+              deletedIterator.seek(snapshotBucketKey);
               while (deletedIterator.hasNext() && delCount < keyLimitPerTask) {
                 Table.KeyValue<String, RepeatedOmKeyInfo>
                     deletedKeyValue = deletedIterator.next();
                 String deletedKey = deletedKeyValue.getKey();
-                lastKeyInCurrentRun = deletedKey;
 
                 // Exit if it is out of the bucket scope.
                 if (!deletedKey.startsWith(snapshotBucketKey)) {
@@ -378,8 +366,7 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
                     calculateExclusiveSize(previousSnapshot,
                         previousToPrevSnapshot, keyInfo, bucketInfo, volumeId,
                         snapRenamedTable, previousKeyTable, prevRenamedTable,
-                        previousToPrevKeyTable, exclusiveSizeMap,
-                        exclusiveReplicatedSizeMap);
+                        previousToPrevKeyTable);
                   }
 
                   if (isKeyReclaimable(previousKeyTable, snapRenamedTable,
@@ -419,15 +406,6 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
                   completedExclusiveSizeSet.add(
                       previousSnapshot.getTableKey());
                 }
-
-                snapshotSeekMap.remove(currSnapInfo.getTableKey());
-              } else {
-                // There are keys that still needs processing
-                // we can continue from it in the next iteration
-                if (lastKeyInCurrentRun != null) {
-                  snapshotSeekMap.put(currSnapInfo.getTableKey(),
-                      lastKeyInCurrentRun);
-                }
               }
 
               if (!keysToPurge.isEmpty()) {
@@ -442,8 +420,98 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
         }
       }
 
-      updateDeepCleanedSnapshots(deepCleanedSnapshots);
       updateSnapshotExclusiveSize();
+      updateDeepCleanedSnapshots(deepCleanedSnapshots);
+    }
+
+    /**
+     * To calculate Exclusive Size for current snapshot, Check
+     * the next snapshot deletedTable if the deleted key is
+     * referenced in current snapshot and not referenced in the
+     * previous snapshot then that key is exclusive to the current
+     * snapshot. Here since we are only iterating through
+     * deletedTable we can check the previous and previous to
+     * previous snapshot to achieve the same.
+     * previousSnapshot - Snapshot for which exclusive size is
+     *                    getting calculating.
+     * currSnapshot - Snapshot's deletedTable is used to calculate
+     *                previousSnapshot snapshot's exclusive size.
+     * previousToPrevSnapshot - Snapshot which is used to check
+     *                 if key is exclusive to previousSnapshot.
+     */
+    @SuppressWarnings("checkstyle:ParameterNumber")
+    private void calculateExclusiveSize(
+        SnapshotInfo previousSnapshot,
+        SnapshotInfo previousToPrevSnapshot,
+        OmKeyInfo keyInfo,
+        OmBucketInfo bucketInfo, long volumeId,
+        Table<String, String> snapRenamedTable,
+        Table<String, OmKeyInfo> previousKeyTable,
+        Table<String, String> prevRenamedTable,
+        Table<String, OmKeyInfo> previousToPrevKeyTable) throws IOException {
+      String prevSnapKey = previousSnapshot.getTableKey();
+      long exclusiveReplicatedSize =
+          exclusiveReplicatedSizeMap.getOrDefault(
+              prevSnapKey, 0L) + keyInfo.getReplicatedSize();
+      long exclusiveSize = exclusiveSizeMap.getOrDefault(
+          prevSnapKey, 0L) + keyInfo.getDataSize();
+
+      // If there is no previous to previous snapshot, then
+      // the previous snapshot is the first snapshot.
+      if (previousToPrevSnapshot == null) {
+        exclusiveSizeMap.put(prevSnapKey, exclusiveSize);
+        exclusiveReplicatedSizeMap.put(prevSnapKey,
+            exclusiveReplicatedSize);
+      } else {
+        OmKeyInfo keyInfoPrevSnapshot = getPreviousSnapshotKeyName(
+                keyInfo, bucketInfo, volumeId,
+                snapRenamedTable, previousKeyTable);
+        OmKeyInfo keyInfoPrevToPrevSnapshot = getPreviousSnapshotKeyName(
+                keyInfoPrevSnapshot, bucketInfo, volumeId,
+                prevRenamedTable, previousToPrevKeyTable);
+        // If the previous to previous snapshot doesn't
+        // have the key, then it is exclusive size for the
+        // previous snapshot.
+        if (keyInfoPrevToPrevSnapshot == null) {
+          exclusiveSizeMap.put(prevSnapKey, exclusiveSize);
+          exclusiveReplicatedSizeMap.put(prevSnapKey,
+              exclusiveReplicatedSize);
+        }
+      }
+    }
+
+    private OmKeyInfo getPreviousSnapshotKeyName(
+        OmKeyInfo keyInfo, OmBucketInfo bucketInfo, long volumeId,
+        Table<String, String> snapRenamedTable,
+        Table<String, OmKeyInfo> previousKeyTable) throws IOException {
+
+      if (keyInfo == null) {
+        return null;
+      }
+
+      String dbKeyPrevSnap;
+      if (bucketInfo.getBucketLayout().isFileSystemOptimized()) {
+        dbKeyPrevSnap = getOzoneManager().getMetadataManager().getOzonePathKey(
+            volumeId,
+            bucketInfo.getObjectID(),
+            keyInfo.getParentObjectID(),
+            keyInfo.getFileName());
+      } else {
+        dbKeyPrevSnap = getOzoneManager().getMetadataManager().getOzoneKey(
+            keyInfo.getVolumeName(),
+            keyInfo.getBucketName(),
+            keyInfo.getKeyName());
+      }
+
+      String dbRenameKey = getOzoneManager().getMetadataManager().getRenameKey(
+          keyInfo.getVolumeName(),
+          keyInfo.getBucketName(),
+          keyInfo.getObjectID());
+
+      String renamedKey = snapRenamedTable.getIfExist(dbRenameKey);
+      dbKeyPrevSnap = renamedKey != null ? renamedKey : dbKeyPrevSnap;
+
+      return previousKeyTable.get(dbKeyPrevSnap);
     }
 
     private void updateSnapshotExclusiveSize() {
@@ -457,15 +525,15 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
       while (completedSnapshotIterator.hasNext()) {
         ClientId clientId = ClientId.randomId();
         String dbKey = completedSnapshotIterator.next();
-        SnapshotSize snapshotSize = SnapshotSize.newBuilder()
-                .setExclusiveSize(exclusiveSizeMap.getOrDefault(dbKey, 0L))
+        SnapshotProperty snapshotProperty = SnapshotProperty.newBuilder()
+                .setSnapshotKey(dbKey)
+                .setExclusiveSize(exclusiveSizeMap.get(dbKey))
                 .setExclusiveReplicatedSize(
-                    exclusiveReplicatedSizeMap.getOrDefault(dbKey, 0L))
+                    exclusiveReplicatedSizeMap.get(dbKey))
                 .build();
         SetSnapshotPropertyRequest setSnapshotPropertyRequest =
             SetSnapshotPropertyRequest.newBuilder()
-                .setSnapshotKey(dbKey)
-                .setSnapshotSize(snapshotSize)
+                .setSnapshotProperty(snapshotProperty)
                 .build();
 
         OMRequest omRequest = OMRequest.newBuilder()
@@ -481,17 +549,16 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
     }
 
     private void updateDeepCleanedSnapshots(List<String> deepCleanedSnapshots) 
{
-      for (String deepCleanedSnapshot: deepCleanedSnapshots) {
+      if (!deepCleanedSnapshots.isEmpty()) {
         ClientId clientId = ClientId.randomId();
-        SetSnapshotPropertyRequest setSnapshotPropertyRequest =
-            SetSnapshotPropertyRequest.newBuilder()
-                .setSnapshotKey(deepCleanedSnapshot)
-                .setDeepCleanedDeletedKey(true)
-                .build();
+        SnapshotPurgeRequest snapshotPurgeRequest = SnapshotPurgeRequest
+            .newBuilder()
+            .addAllUpdatedSnapshotDBKey(deepCleanedSnapshots)
+            .build();
 
         OMRequest omRequest = OMRequest.newBuilder()
-            .setCmdType(Type.SetSnapshotProperty)
-            .setSetSnapshotPropertyRequest(setSnapshotPropertyRequest)
+            .setCmdType(Type.SnapshotPurge)
+            .setSnapshotPurgeRequest(snapshotPurgeRequest)
             .setClientId(clientId.toString())
             .build();
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java
deleted file mode 100644
index 9a60f63038..0000000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java
+++ /dev/null
@@ -1,515 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.om.service;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ServiceException;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
-import org.apache.hadoop.hdds.utils.BackgroundTask;
-import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
-import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
-import org.apache.hadoop.hdds.utils.IOUtils;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.ozone.common.BlockGroup;
-import org.apache.hadoop.ozone.om.IOmMetadataReader;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
-import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
-import org.apache.hadoop.ozone.om.OmSnapshot;
-import org.apache.hadoop.ozone.om.OmSnapshotManager;
-import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.SnapshotChainManager;
-import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
-import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
-import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
-import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
-import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
-import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
-import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
-import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetSnapshotPropertyRequest;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotSize;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Stack;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-
-import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
-import static 
org.apache.hadoop.ozone.om.helpers.SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE;
-import static 
org.apache.hadoop.ozone.om.request.file.OMFileRequest.getDirectoryInfo;
-import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getOzonePathKeyForFso;
-
-/**
- * Snapshot BG Service for deleted directory deep clean and exclusive size
- * calculation for deleted directories.
- */
-public class SnapshotDirectoryCleaningService
-    extends AbstractKeyDeletingService {
-  // Use only a single thread for DirDeletion. Multiple threads would read
-  // or write to same tables and can send deletion requests for same key
-  // multiple times.
-  private static final int SNAPSHOT_DIR_CORE_POOL_SIZE = 1;
-
-  private final AtomicBoolean suspended;
-  private final Map<String, Long> exclusiveSizeMap;
-  private final Map<String, Long> exclusiveReplicatedSizeMap;
-
-  public SnapshotDirectoryCleaningService(long interval, TimeUnit unit,
-                                          long serviceTimeout,
-                                          OzoneManager ozoneManager,
-                                          ScmBlockLocationProtocol scmClient) {
-    super(SnapshotDirectoryCleaningService.class.getSimpleName(),
-        interval, unit, SNAPSHOT_DIR_CORE_POOL_SIZE, serviceTimeout,
-        ozoneManager, scmClient);
-    this.suspended = new AtomicBoolean(false);
-    this.exclusiveSizeMap = new HashMap<>();
-    this.exclusiveReplicatedSizeMap = new HashMap<>();
-  }
-
-  private boolean shouldRun() {
-    if (getOzoneManager() == null) {
-      // OzoneManager can be null for testing
-      return true;
-    }
-    return getOzoneManager().isLeaderReady() && !suspended.get();
-  }
-
-  /**
-   * Suspend the service.
-   */
-  @VisibleForTesting
-  public void suspend() {
-    suspended.set(true);
-  }
-
-  /**
-   * Resume the service if suspended.
-   */
-  @VisibleForTesting
-  public void resume() {
-    suspended.set(false);
-  }
-
-  @Override
-  public BackgroundTaskQueue getTasks() {
-    BackgroundTaskQueue queue = new BackgroundTaskQueue();
-    queue.add(new SnapshotDirectoryCleaningService.SnapshotDirTask());
-    return queue;
-  }
-
-  private class SnapshotDirTask implements BackgroundTask {
-
-    @Override
-    public BackgroundTaskResult call() {
-      if (!shouldRun()) {
-        return BackgroundTaskResult.EmptyTaskResult.newResult();
-      }
-      LOG.debug("Running SnapshotDirectoryCleaningService");
-
-      getRunCount().incrementAndGet();
-      OmSnapshotManager omSnapshotManager =
-          getOzoneManager().getOmSnapshotManager();
-      Table<String, SnapshotInfo> snapshotInfoTable =
-          getOzoneManager().getMetadataManager().getSnapshotInfoTable();
-      OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl)
-          getOzoneManager().getMetadataManager();
-      SnapshotChainManager snapChainManager = metadataManager
-          .getSnapshotChainManager();
-
-      try (TableIterator<String, ? extends Table.KeyValue
-          <String, SnapshotInfo>> iterator = snapshotInfoTable.iterator()) {
-
-        while (iterator.hasNext()) {
-          SnapshotInfo currSnapInfo = iterator.next().getValue();
-
-          // Expand deleted dirs only on active snapshot. Deleted Snapshots
-          // will be cleaned up by SnapshotDeletingService.
-          if (currSnapInfo.getSnapshotStatus() != SNAPSHOT_ACTIVE ||
-              currSnapInfo.getDeepCleanedDeletedDir()) {
-            continue;
-          }
-
-          ReferenceCounted<IOmMetadataReader, SnapshotCache>
-              rcPrevOmSnapshot = null;
-          ReferenceCounted<IOmMetadataReader, SnapshotCache>
-              rcPrevToPrevOmSnapshot = null;
-          try {
-            long volumeId = metadataManager
-                .getVolumeId(currSnapInfo.getVolumeName());
-            // Get bucketInfo for the snapshot bucket to get bucket layout.
-            String dbBucketKey = metadataManager
-                .getBucketKey(currSnapInfo.getVolumeName(),
-                    currSnapInfo.getBucketName());
-            OmBucketInfo bucketInfo = metadataManager
-                .getBucketTable().get(dbBucketKey);
-
-            if (bucketInfo == null) {
-              throw new IllegalStateException("Bucket " + "/" +
-                  currSnapInfo.getVolumeName() + "/" + currSnapInfo
-                  .getBucketName() +
-                  " is not found. BucketInfo should not be " +
-                  "null for snapshotted bucket. The OM is in " +
-                  "unexpected state.");
-            }
-
-            SnapshotInfo previousSnapshot = getPreviousActiveSnapshot(
-                currSnapInfo, snapChainManager, omSnapshotManager);
-            SnapshotInfo previousToPrevSnapshot = null;
-
-            Table<String, OmKeyInfo> previousKeyTable = null;
-            Table<String, String> prevRenamedTable = null;
-
-            if (previousSnapshot != null) {
-              rcPrevOmSnapshot = omSnapshotManager.checkForSnapshot(
-                  previousSnapshot.getVolumeName(),
-                  previousSnapshot.getBucketName(),
-                  getSnapshotPrefix(previousSnapshot.getName()), false);
-              OmSnapshot omPreviousSnapshot = (OmSnapshot)
-                  rcPrevOmSnapshot.get();
-
-              previousKeyTable = omPreviousSnapshot.getMetadataManager()
-                  .getKeyTable(bucketInfo.getBucketLayout());
-              prevRenamedTable = omPreviousSnapshot
-                  .getMetadataManager().getSnapshotRenamedTable();
-              previousToPrevSnapshot = getPreviousActiveSnapshot(
-                  previousSnapshot, snapChainManager, omSnapshotManager);
-            }
-
-            Table<String, OmKeyInfo> previousToPrevKeyTable = null;
-            if (previousToPrevSnapshot != null) {
-              rcPrevToPrevOmSnapshot = omSnapshotManager.checkForSnapshot(
-                  previousToPrevSnapshot.getVolumeName(),
-                  previousToPrevSnapshot.getBucketName(),
-                  getSnapshotPrefix(previousToPrevSnapshot.getName()), false);
-              OmSnapshot omPreviousToPrevSnapshot = (OmSnapshot)
-                  rcPrevToPrevOmSnapshot.get();
-
-              previousToPrevKeyTable = omPreviousToPrevSnapshot
-                  .getMetadataManager()
-                  .getKeyTable(bucketInfo.getBucketLayout());
-            }
-
-            String dbBucketKeyForDir = getOzonePathKeyForFso(metadataManager,
-                currSnapInfo.getVolumeName(), currSnapInfo.getBucketName());
-            try (ReferenceCounted<IOmMetadataReader, SnapshotCache>
-                     rcCurrOmSnapshot = omSnapshotManager.checkForSnapshot(
-                currSnapInfo.getVolumeName(),
-                currSnapInfo.getBucketName(),
-                getSnapshotPrefix(currSnapInfo.getName()),
-                false)) {
-
-              OmSnapshot currOmSnapshot = (OmSnapshot) rcCurrOmSnapshot.get();
-              Table<String, OmKeyInfo> snapDeletedDirTable =
-                  currOmSnapshot.getMetadataManager().getDeletedDirTable();
-
-              try (TableIterator<String, ? extends Table.KeyValue<String,
-                  OmKeyInfo>> deletedDirIterator = snapDeletedDirTable
-                  .iterator(dbBucketKeyForDir)) {
-
-                while (deletedDirIterator.hasNext()) {
-                  Table.KeyValue<String, OmKeyInfo> deletedDirInfo =
-                      deletedDirIterator.next();
-
-                  // For each deleted directory we do an in-memory DFS and
-                  // do a deep clean and exclusive size calculation.
-                  iterateDirectoryTree(deletedDirInfo, volumeId, bucketInfo,
-                      previousSnapshot, previousToPrevSnapshot,
-                      currOmSnapshot, previousKeyTable, prevRenamedTable,
-                      previousToPrevKeyTable, dbBucketKeyForDir);
-                }
-                updateDeepCleanSnapshotDir(currSnapInfo.getTableKey());
-                if (previousSnapshot != null) {
-                  updateExclusiveSize(previousSnapshot.getTableKey());
-                }
-              }
-            }
-          } finally {
-            IOUtils.closeQuietly(rcPrevOmSnapshot, rcPrevToPrevOmSnapshot);
-          }
-        }
-      } catch (IOException ex) {
-        LOG.error("Error while running directory deep clean on snapshots." +
-            " Will retry at next run.", ex);
-      }
-      return BackgroundTaskResult.EmptyTaskResult.newResult();
-    }
-  }
-
-  @SuppressWarnings("checkstyle:ParameterNumber")
-  private void iterateDirectoryTree(
-      Table.KeyValue<String, OmKeyInfo> deletedDirInfo, long volumeId,
-      OmBucketInfo bucketInfo,
-      SnapshotInfo previousSnapshot,
-      SnapshotInfo previousToPrevSnapshot,
-      OmSnapshot currOmSnapshot,
-      Table<String, OmKeyInfo> previousKeyTable,
-      Table<String, String> prevRenamedTable,
-      Table<String, OmKeyInfo> previousToPrevKeyTable,
-      String dbBucketKeyForDir) throws IOException {
-
-    Table<String, OmDirectoryInfo> snapDirTable =
-        currOmSnapshot.getMetadataManager().getDirectoryTable();
-    Table<String, String> snapRenamedTable =
-        currOmSnapshot.getMetadataManager().getSnapshotRenamedTable();
-
-    Stack<StackNode> stackNodes = new Stack<>();
-    OmDirectoryInfo omDeletedDirectoryInfo =
-        getDirectoryInfo(deletedDirInfo.getValue());
-    String dirPathDbKey = currOmSnapshot.getMetadataManager()
-        .getOzonePathKey(volumeId, bucketInfo.getObjectID(),
-            omDeletedDirectoryInfo);
-    // Stack Init
-    StackNode topLevelDir = new StackNode();
-    topLevelDir.setDirKey(dirPathDbKey);
-    topLevelDir.setDirValue(omDeletedDirectoryInfo);
-    stackNodes.push(topLevelDir);
-
-    try (TableIterator<String, ? extends Table.KeyValue<String, 
OmDirectoryInfo>>
-            directoryIterator = snapDirTable.iterator(dbBucketKeyForDir)) {
-
-      while (!stackNodes.isEmpty()) {
-        StackNode stackTop = stackNodes.peek();
-        // First process all the files in the current directory
-        // and then do a DFS for directory.
-        if (StringUtils.isEmpty(stackTop.getSubDirSeek())) {
-          processFilesUnderDir(previousSnapshot,
-              previousToPrevSnapshot,
-              volumeId,
-              bucketInfo,
-              stackTop.getDirValue(),
-              currOmSnapshot.getMetadataManager(),
-              snapRenamedTable,
-              previousKeyTable,
-              prevRenamedTable,
-              previousToPrevKeyTable);
-          // Format : /volId/bucketId/parentId/
-          String seekDirInDB = currOmSnapshot.getMetadataManager()
-              .getOzonePathKey(volumeId, bucketInfo.getObjectID(),
-                  stackTop.getDirValue().getObjectID(), "");
-          stackTop.setSubDirSeek(seekDirInDB);
-        } else {
-          // Adding \0 to seek the next greater element.
-          directoryIterator.seek(stackTop.getSubDirSeek() + "\0");
-          if (directoryIterator.hasNext()) {
-
-            Table.KeyValue<String, OmDirectoryInfo> deletedSubDirInfo = 
directoryIterator.next();
-            String deletedSubDirKey = deletedSubDirInfo.getKey();
-            String prefixCheck = currOmSnapshot.getMetadataManager()
-                .getOzoneDeletePathDirKey(stackTop.getSubDirSeek());
-            // Exit if it is out of the sub dir prefix scope.
-            if (!deletedSubDirKey.startsWith(prefixCheck)) {
-              stackNodes.pop();
-            } else {
-              stackTop.setSubDirSeek(deletedSubDirKey);
-              StackNode nextSubDir = new StackNode();
-              nextSubDir.setDirKey(deletedSubDirInfo.getKey());
-              nextSubDir.setDirValue(deletedSubDirInfo.getValue());
-              stackNodes.push(nextSubDir);
-            }
-          } else {
-            stackNodes.pop();
-          }
-        }
-      }
-    }
-  }
-
-  private void updateExclusiveSize(String prevSnapshotKeyTable) {
-    ClientId clientId = ClientId.randomId();
-    SnapshotSize snapshotSize = SnapshotSize.newBuilder()
-            .setExclusiveSize(
-                exclusiveSizeMap.getOrDefault(prevSnapshotKeyTable, 0L))
-            .setExclusiveReplicatedSize(
-                exclusiveReplicatedSizeMap.getOrDefault(
-                    prevSnapshotKeyTable, 0L))
-            .build();
-    exclusiveSizeMap.remove(prevSnapshotKeyTable);
-    exclusiveReplicatedSizeMap.remove(prevSnapshotKeyTable);
-    SetSnapshotPropertyRequest
-        setSnapshotPropertyRequest =
-        SetSnapshotPropertyRequest.newBuilder()
-            .setSnapshotKey(prevSnapshotKeyTable)
-            .setSnapshotSize(snapshotSize)
-            .build();
-
-    OMRequest omRequest = OMRequest.newBuilder()
-            .setCmdType(Type.SetSnapshotProperty)
-            .setSetSnapshotPropertyRequest(setSnapshotPropertyRequest)
-            .setClientId(clientId.toString())
-            .build();
-
-    submitRequest(omRequest, clientId);
-  }
-
-  @SuppressWarnings("checkstyle:ParameterNumber")
-  private void processFilesUnderDir(
-      SnapshotInfo previousSnapshot,
-      SnapshotInfo previousToPrevSnapshot,
-      long volumeId,
-      OmBucketInfo bucketInfo,
-      OmDirectoryInfo parentInfo,
-      OMMetadataManager metadataManager,
-      Table<String, String> snapRenamedTable,
-      Table<String, OmKeyInfo> previousKeyTable,
-      Table<String, String> prevRenamedTable,
-      Table<String, OmKeyInfo> previousToPrevKeyTable)
-        throws IOException {
-    String seekFileInDB = metadataManager.getOzonePathKey(volumeId,
-        bucketInfo.getObjectID(),
-        parentInfo.getObjectID(), "");
-    List<BlockGroup> blocksForKeyDelete = new ArrayList<>();
-
-    Table<String, OmKeyInfo> fileTable = metadataManager.getFileTable();
-    try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
-             iterator = fileTable.iterator(seekFileInDB)) {
-
-      while (iterator.hasNext()) {
-        Table.KeyValue<String, OmKeyInfo> entry = iterator.next();
-        OmKeyInfo fileInfo = entry.getValue();
-        if (!OMFileRequest.isImmediateChild(fileInfo.getParentObjectID(),
-            parentInfo.getObjectID())) {
-          break;
-        }
-
-        String ozoneDeletePathKey = metadataManager
-            .getOzoneDeletePathKey(fileInfo.getObjectID(), entry.getKey());
-        if (isKeyReclaimable(previousKeyTable, snapRenamedTable,
-            fileInfo, bucketInfo, volumeId, null)) {
-          for (OmKeyLocationInfoGroup keyLocations :
-              fileInfo.getKeyLocationVersions()) {
-            List<BlockID> item = keyLocations.getLocationList().stream()
-                .map(b -> new BlockID(b.getContainerID(), b.getLocalID()))
-                .collect(Collectors.toList());
-            BlockGroup keyBlocks = BlockGroup.newBuilder()
-                .setKeyName(ozoneDeletePathKey)
-                .addAllBlockIDs(item)
-                .build();
-            blocksForKeyDelete.add(keyBlocks);
-          }
-          // TODO: Add Retry mechanism.
-          getScmClient().deleteKeyBlocks(blocksForKeyDelete);
-        } else if (previousSnapshot != null) {
-          calculateExclusiveSize(previousSnapshot, previousToPrevSnapshot,
-              fileInfo, bucketInfo, volumeId, snapRenamedTable,
-              previousKeyTable, prevRenamedTable, previousToPrevKeyTable,
-              exclusiveSizeMap, exclusiveReplicatedSizeMap);
-        }
-      }
-    }
-  }
-
-  private void updateDeepCleanSnapshotDir(String snapshotKeyTable) {
-    ClientId clientId = ClientId.randomId();
-    SetSnapshotPropertyRequest setSnapshotPropertyRequest =
-        SetSnapshotPropertyRequest.newBuilder()
-            .setSnapshotKey(snapshotKeyTable)
-            .setDeepCleanedDeletedDir(true)
-            .build();
-
-    OMRequest omRequest = OMRequest.newBuilder()
-            .setCmdType(Type.SetSnapshotProperty)
-            .setSetSnapshotPropertyRequest(setSnapshotPropertyRequest)
-            .setClientId(clientId.toString())
-            .build();
-
-    submitRequest(omRequest, clientId);
-  }
-
-  public void submitRequest(OMRequest omRequest, ClientId clientId) {
-    try {
-      if (isRatisEnabled()) {
-        OzoneManagerRatisServer server =
-            getOzoneManager().getOmRatisServer();
-
-        RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
-            .setClientId(clientId)
-            .setServerId(server.getRaftPeerId())
-            .setGroupId(server.getRaftGroupId())
-            .setCallId(getRunCount().get())
-            .setMessage(Message.valueOf(
-                OMRatisHelper.convertRequestToByteString(omRequest)))
-            .setType(RaftClientRequest.writeRequestType())
-            .build();
-
-        server.submitRequest(omRequest, raftClientRequest);
-      } else {
-        getOzoneManager().getOmServerProtocol()
-            .submitRequest(null, omRequest);
-      }
-    } catch (ServiceException e) {
-      LOG.error("Snapshot deep cleaning request failed. " +
-          "Will retry at next run.", e);
-    }
-  }
-
-  /**
-   * Stack node data for directory deep clean for snapshot.
-   */
-  private static class StackNode {
-    private String dirKey;
-    private OmDirectoryInfo dirValue;
-    private String subDirSeek;
-
-    public String getDirKey() {
-      return dirKey;
-    }
-
-    public void setDirKey(String dirKey) {
-      this.dirKey = dirKey;
-    }
-
-    public OmDirectoryInfo getDirValue() {
-      return dirValue;
-    }
-
-    public void setDirValue(OmDirectoryInfo dirValue) {
-      this.dirValue = dirValue;
-    }
-
-    public String getSubDirSeek() {
-      return subDirSeek;
-    }
-
-    public void setSubDirSeek(String subDirSeek) {
-      this.subDirSeek = subDirSeek;
-    }
-
-    @Override
-    public String toString() {
-      return "StackNode{" +
-          "dirKey='" + dirKey + '\'' +
-          ", dirObjectId=" + dirValue.getObjectID() +
-          ", subDirSeek='" + subDirSeek + '\'' +
-          '}';
-    }
-  }
-}
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotSetPropertyRequestAndResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotSetPropertyRequestAndResponse.java
index 09ce3ac3bf..f643688f8b 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotSetPropertyRequestAndResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotSetPropertyRequestAndResponse.java
@@ -36,7 +36,7 @@ import 
org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotSetPropertyRespons
 import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotSize;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotProperty;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetSnapshotPropertyRequest;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.BeforeEach;
@@ -150,14 +150,14 @@ public class TestOMSnapshotSetPropertyRequestAndResponse {
              iterator = omMetadataManager.getSnapshotInfoTable().iterator()) {
       while (iterator.hasNext()) {
         String snapDbKey = iterator.next().getKey();
-        SnapshotSize snapshotSize = SnapshotSize.newBuilder()
+        SnapshotProperty snapshotSize = SnapshotProperty.newBuilder()
+            .setSnapshotKey(snapDbKey)
             .setExclusiveSize(exclusiveSize)
             .setExclusiveReplicatedSize(exclusiveSizeAfterRepl)
             .build();
         SetSnapshotPropertyRequest snapshotUpdateSizeRequest =
             SetSnapshotPropertyRequest.newBuilder()
-                .setSnapshotKey(snapDbKey)
-                .setSnapshotSize(snapshotSize)
+                .setSnapshotProperty(snapshotSize)
                 .build();
 
         OMRequest omRequest = OMRequest.newBuilder()
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
index cc8dee24bd..df27bae1ba 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
@@ -121,7 +121,7 @@ public class TestKeyDeletingService {
     }
     System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
     ServerUtils.setOzoneMetaDirPath(conf, newFolder.toString());
-    conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 1000,
+    conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
         TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL,
         100, TimeUnit.MILLISECONDS);
@@ -207,7 +207,7 @@ public class TestKeyDeletingService {
     // Make sure that we have run the background thread 5 times more
     GenericTestUtils.waitFor(
         () -> keyDeletingService.getRunCount().get() >= 5,
-        100, 10000);
+        100, 1000);
     // Since SCM calls are failing, deletedKeyCount should be zero.
     assertEquals(0, keyDeletingService.getDeletedKeyCount().get());
     assertEquals(keyCount, keyManager
@@ -544,7 +544,7 @@ t
     // Create Snap3, traps all the deleted keys.
     writeClient.createSnapshot(volumeName, bucketName, "snap3");
     assertTableRowCount(snapshotInfoTable, 3, metadataManager);
-    checkSnapDeepCleanStatus(snapshotInfoTable, false);
+    checkSnapDeepCleanStatus(snapshotInfoTable, true);
 
     keyDeletingService.resume();
 
@@ -564,8 +564,9 @@ t
 
       assertTableRowCount(snap3deletedTable, 0, metadataManager);
       assertTableRowCount(deletedTable, 0, metadataManager);
-      checkSnapDeepCleanStatus(snapshotInfoTable, true);
+      checkSnapDeepCleanStatus(snapshotInfoTable, false);
     }
+
   }
 
   @Test
@@ -669,7 +670,6 @@ t
              iterator = snapshotInfoTable.iterator()) {
       while (iterator.hasNext()) {
         Table.KeyValue<String, SnapshotInfo> snapshotEntry = iterator.next();
-        System.out.println(snapshotEntry.getValue());
         String snapshotName = snapshotEntry.getValue().getName();
         assertEquals(expectedSize.get(snapshotName),
             snapshotEntry.getValue().


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to