This is an automated email from the ASF dual-hosted git repository.

sshenoy pushed a commit to branch HDDS-6517-Snapshot
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-6517-Snapshot by this 
push:
     new f77dfa6f59 HDDS-6962. [Snapshot] Background Service to delete 
irrelevant SST files in a snapshot. (#3883)
f77dfa6f59 is described below

commit f77dfa6f5911dfd0e73badbc0ebbb05747337ff0
Author: Sadanand Shenoy <[email protected]>
AuthorDate: Tue Dec 6 11:05:15 2022 +0530

    HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in 
a snapshot. (#3883)
---
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   6 +
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   2 +
 .../common/src/main/resources/ozone-default.xml    |  22 ++
 .../hadoop/hdds/utils/BooleanTriFunction.java      |  37 ++++
 .../org/apache/hadoop/hdds/utils/db/DBProfile.java |  25 +++
 .../org/apache/hadoop/hdds/utils/db/RDBStore.java  |   1 +
 .../apache/hadoop/hdds/utils/db/RocksDatabase.java | 132 ++++++++++++
 hadoop-hdds/rocksdb-checkpoint-differ/pom.xml      |   5 +
 .../ozone/rocksdiff/RocksDBCheckpointDiffer.java   |  75 +++++--
 .../org/apache/ozone/rocksdiff/RocksDiffUtils.java |  54 +++++
 .../rocksdiff/TestRocksDBCheckpointDiffer.java     |   2 +-
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |  10 +
 .../hadoop/ozone/freon/TestOMSnapshotDAG.java      |  22 +-
 .../org/apache/hadoop/ozone/om/KeyManager.java     |   6 +
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  32 +++
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |   9 +
 .../hadoop/ozone/om/SstFilteringService.java       | 212 ++++++++++++++++++
 .../hadoop/ozone/om/TestKeyDeletingService.java    |  18 +-
 .../hadoop/ozone/om/TestSstFilteringService.java   | 237 +++++++++++++++++++++
 pom.xml                                            |   1 +
 20 files changed, 886 insertions(+), 22 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 8ae3577ac4..4293f3ef1e 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -190,6 +190,12 @@ public final class OzoneConfigKeys {
   public static final String OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT
       = "300s"; // 300s for default
 
+  public static final String OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT =
+      "ozone.sst.filtering.service.timeout";
+  public static final String
+      OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT_DEFAULT = "300s";
+      // 300s for default
+
   public static final String OZONE_BLOCK_DELETING_SERVICE_WORKERS =
       "ozone.block.deleting.service.workers";
   public static final int OZONE_BLOCK_DELETING_SERVICE_WORKERS_DEFAULT
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 97dd7fde0b..6b78a5c4a1 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -554,4 +554,6 @@ public final class OzoneConsts {
   public static final String OM_SNAPSHOT_DIR = "db.snapshots";
   public static final String OM_SNAPSHOT_INDICATOR = ".snapshot";
 
+  public static final String FILTERED_SNAPSHOTS = "filtered-snapshots";
+
 }
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index c10344756a..788bc0e8a5 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3058,6 +3058,28 @@
       directory deleting service per time interval.
     </description>
   </property>
+  <property>
+    <name>ozone.snapshot.filtering.limit.per.task</name>
+    <value>2</value>
+    <tag>OZONE, PERFORMANCE, OM</tag>
+    <description>A maximum number of snapshots to be filtered by
+      sst filtering service per time interval.
+    </description>
+  </property>
+  <property>
+    <name>ozone.snapshot.filtering.service.interval</name>
+    <value>1m</value>
+    <tag>OZONE, PERFORMANCE, OM</tag>
+    <description>Time interval of the SST File filtering service from Snapshot.
+    </description>
+  </property>
+  <property>
+    <name>ozone.sst.filtering.service.timeout</name>
+    <value>300000ms</value>
+    <tag>OZONE, PERFORMANCE,OM</tag>
+    <description>A timeout value of sst filtering service.
+    </description>
+  </property>
 
   <property>
     <name>ozone.scm.event.ContainerReport.thread.pool.size</name>
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BooleanTriFunction.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BooleanTriFunction.java
new file mode 100644
index 0000000000..c9ed1ee667
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BooleanTriFunction.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.utils;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+/**
+ * Defines a functional interface having three inputs and returns boolean as
+ * output.
+ */
+@FunctionalInterface
+public interface BooleanTriFunction<T, U, V, R> {
+
+  R apply(T t, U u, V v);
+
+  default <K> BooleanTriFunction<T, U, V, K> andThen(
+      Function<? super R, ? extends K> after) {
+    Objects.requireNonNull(after);
+    return (T t, U u, V v) -> after.apply(apply(t, u, v));
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBProfile.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBProfile.java
index b3dbb857ab..a0e300fb7f 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBProfile.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBProfile.java
@@ -117,6 +117,31 @@ public enum DBProfile {
       return cfOptions;
     }
 
+    @Override
+    public ManagedBlockBasedTableConfig getBlockBasedTableConfig() {
+      return SSD.getBlockBasedTableConfig();
+    }
+  },
+  TEST {
+    @Override
+    public String toString() {
+      return "TEST";
+    }
+
+    @Override
+    public ManagedDBOptions getDBOptions() {
+      ManagedDBOptions dbOptions = SSD.getDBOptions();
+      return dbOptions;
+    }
+
+    @Override
+    public ManagedColumnFamilyOptions getColumnFamilyOptions() {
+      ManagedColumnFamilyOptions cfOptions = SSD.getColumnFamilyOptions();
+      cfOptions.setCompactionStyle(CompactionStyle.LEVEL);
+      cfOptions.setDisableAutoCompactions(true);
+      return cfOptions;
+    }
+
     @Override
     public ManagedBlockBasedTableConfig getBlockBasedTableConfig() {
       return SSD.getBlockBasedTableConfig();
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index 992627c4cf..adafe079a8 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -42,6 +42,7 @@ import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
 import com.google.common.base.Preconditions;
 import 
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.rocksdb.RocksDBException;
+
 import org.rocksdb.TransactionLogIterator.BatchResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
index 6f20dcee7b..b96ed1d768 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hdds.utils.db;
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.utils.BooleanTriFunction;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedCheckpoint;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
@@ -32,6 +35,7 @@ import 
org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.Holder;
+import org.rocksdb.LiveFileMetaData;
 import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,10 +43,12 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -68,6 +74,9 @@ public final class RocksDatabase {
 
   static final String ESTIMATE_NUM_KEYS = "rocksdb.estimate-num-keys";
 
+  private static List<ColumnFamilyHandle> columnFamilyHandles =
+      new ArrayList<>();
+
 
   static IOException toIOException(Object name, String op, RocksDBException e) 
{
     return HddsServerUtil.toIOException(name + ": Failed to " + op, e);
@@ -136,6 +145,7 @@ public final class RocksDatabase {
         db = ManagedRocksDB.open(dbOptions, dbFile.getAbsolutePath(),
             descriptors, handles);
       }
+      columnFamilyHandles = handles;
       // init a column family map.
       for (ColumnFamilyHandle h : handles) {
         final ColumnFamily f = new ColumnFamily(h);
@@ -359,6 +369,27 @@ public final class RocksDatabase {
     }
   }
 
+  /**
+   * @param cfName columnFamily on which flush will run.
+   * @throws IOException
+   */
+  public void flush(String cfName) throws IOException {
+    ColumnFamilyHandle handle = getColumnFamilyHandle(cfName);
+    try (ManagedFlushOptions options = new ManagedFlushOptions()) {
+      options.setWaitForFlush(true);
+      if (handle != null) {
+        db.get().flush(options, handle);
+      } else {
+        LOG.error("Provided column family doesn't exist."
+            + " Calling flush on null columnFamily");
+        flush();
+      }
+    } catch (RocksDBException e) {
+      closeOnError(e);
+      throw toIOException(this, "flush", e);
+    }
+  }
+
   public void flushWal(boolean sync) throws IOException {
     try {
       db.get().flushWal(sync);
@@ -377,6 +408,41 @@ public final class RocksDatabase {
     }
   }
 
+  /**
+   * @param cfName columnFamily on which compaction will run.
+   * @throws IOException
+   */
+  public void compactRange(String cfName) throws IOException {
+    ColumnFamilyHandle handle = getColumnFamilyHandle(cfName);
+    try {
+      if (handle != null) {
+        db.get().compactRange(handle);
+      } else {
+        LOG.error("Provided column family doesn't exist."
+            + " Calling compactRange on null columnFamily");
+        db.get().compactRange();
+      }
+    } catch (RocksDBException e) {
+      closeOnError(e);
+      throw toIOException(this, "compactRange", e);
+    }
+  }
+
+  private ColumnFamilyHandle getColumnFamilyHandle(String cfName)
+      throws IOException {
+    for (ColumnFamilyHandle cf : getColumnFamilyHandles()) {
+      try {
+        if (cfName.equals(new String(cf.getName(), StandardCharsets.UTF_8))) {
+          return cf;
+        }
+      } catch (RocksDBException e) {
+        closeOnError(e);
+        throw toIOException(this, "columnFamilyHandle.getName", e);
+      }
+    }
+    return null;
+  }
+
   RocksCheckpoint createCheckpoint() {
     return new RocksCheckpoint();
   }
@@ -523,4 +589,70 @@ public final class RocksDatabase {
     return name;
   }
 
+  @VisibleForTesting
+  public List<LiveFileMetaData> getSstFileList() {
+    return db.get().getLiveFilesMetaData();
+  }
+
+  /**
+   * return the max compaction level of sst files in the db.
+   * @return level
+   */
+  private int getLastLevel() {
+    return getSstFileList().stream()
+        .max(Comparator.comparing(LiveFileMetaData::level)).get().level();
+  }
+
+  /**
+   * Deletes sst files which do not correspond to prefix
+   * for given table.
+   * @param prefixPairs, a list of pair (TableName,prefixUsed).
+   * @throws RocksDBException
+   */
+  public void deleteFilesNotMatchingPrefix(
+      List<Pair<String, String>> prefixPairs,
+      BooleanTriFunction<String, String, String, Boolean> filterFunction)
+      throws RocksDBException {
+    for (LiveFileMetaData liveFileMetaData : getSstFileList()) {
+      String sstFileColumnFamily =
+          new String(liveFileMetaData.columnFamilyName(),
+              StandardCharsets.UTF_8);
+      int lastLevel = getLastLevel();
+      for (Pair<String, String> prefixPair : prefixPairs) {
+        String columnFamily = prefixPair.getKey();
+        String prefixForColumnFamily = prefixPair.getValue();
+        if (!sstFileColumnFamily.equals(columnFamily)) {
+          continue;
+        }
+        // RocksDB #deleteFile API allows only to delete the last level of
+        // SST Files. Any level < last level won't get deleted and
+        // only last file of level 0 can be deleted
+        // and will throw warning in the rocksdb manifest.
+        // Instead, perform the level check here
+        // itself to avoid failed delete attempts for lower level files.
+        if (liveFileMetaData.level() != lastLevel || lastLevel == 0) {
+          continue;
+        }
+        String firstDbKey =
+            new String(liveFileMetaData.smallestKey(), StandardCharsets.UTF_8);
+        String lastDbKey =
+            new String(liveFileMetaData.largestKey(), StandardCharsets.UTF_8);
+        boolean isKeyWithPrefixPresent =
+            filterFunction.apply(firstDbKey, lastDbKey, prefixForColumnFamily);
+        if (!isKeyWithPrefixPresent) {
+          String sstFileName = liveFileMetaData.fileName();
+          LOG.info("Deleting sst file {} corresponding to column family"
+                  + " {} from db: {}", sstFileName,
+              liveFileMetaData.columnFamilyName(), db.get().getName());
+          db.get().deleteFile(sstFileName);
+        }
+      }
+    }
+  }
+
+  public static List<ColumnFamilyHandle> getColumnFamilyHandles() {
+    return columnFamilyHandles;
+  }
+
+
 }
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml 
b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
index 0c03876df0..c6af10fbc4 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
@@ -34,6 +34,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd";>
       <groupId>org.rocksdb</groupId>
       <artifactId>rocksdbjni</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.ozone</groupId>
+      <artifactId>hdds-common</artifactId>
+      <version>1.3.0-SNAPSHOT</version>
+    </dependency>
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index 99ace91c80..4ad8a0f9b2 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -29,9 +29,11 @@ import org.rocksdb.CompactionJobInfo;
 import org.rocksdb.DBOptions;
 import org.rocksdb.LiveFileMetaData;
 import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.SstFileReader;
+import org.rocksdb.SstFileReaderIterator;
 import org.rocksdb.TableProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,6 +51,7 @@ import java.util.Comparator;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -455,29 +458,31 @@ public class RocksDBCheckpointDiffer {
    * @return number of keys
    */
   private long getSSTFileSummary(String filename) throws RocksDBException {
+    Options option = new Options();
+    SstFileReader reader = new SstFileReader(option);
+
+    reader.open(getAbsoluteSstFilePath(filename));
+
+    TableProperties properties = reader.getTableProperties();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("{} has {} keys", filename, properties.getNumEntries());
+    }
+    return properties.getNumEntries();
+  }
 
+  private String getAbsoluteSstFilePath(String filename) {
     if (!filename.endsWith(SST_FILE_EXTENSION)) {
       filename += SST_FILE_EXTENSION;
     }
-
-    Options option = new Options();
-    SstFileReader reader = new SstFileReader(option);
-
     File sstFile = new File(sstBackupDir + filename);
     File sstFileInActiveDB = new File(activeDBLocationStr + filename);
     if (sstFile.exists()) {
-      reader.open(sstBackupDir + filename);
+      return sstBackupDir + filename;
     } else if (sstFileInActiveDB.exists()) {
-      reader.open(activeDBLocationStr + filename);
+      return activeDBLocationStr + filename;
     } else {
       throw new RuntimeException("Can't find SST file: " + filename);
     }
-
-    TableProperties properties = reader.getTableProperties();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("{} has {} keys", filename, properties.getNumEntries());
-    }
-    return properties.getNumEntries();
   }
 
   /**
@@ -637,11 +642,14 @@ public class RocksDBCheckpointDiffer {
     private final String dbPath;
     private final String snapshotID;
     private final long snapshotGeneration;
+    private final Map<String, String> tablePrefixes;
 
-    public DifferSnapshotInfo(String db, String id, long gen) {
+    public DifferSnapshotInfo(String db, String id, long gen,
+        Map<String, String> prefixes) {
       dbPath = db;
       snapshotID = id;
       snapshotGeneration = gen;
+      tablePrefixes = prefixes;
     }
 
     public String getDbPath() {
@@ -656,6 +664,10 @@ public class RocksDBCheckpointDiffer {
       return snapshotGeneration;
     }
 
+    public Map<String, String> getTablePrefixes() {
+      return tablePrefixes;
+    }
+
     @Override
     public String toString() {
       return "DifferSnapshotInfo{" + "dbPath='" + dbPath + '\''
@@ -705,9 +717,45 @@ public class RocksDBCheckpointDiffer {
       LOG.debug("{}", logSB);
     }
 
+    if (src.getTablePrefixes() != null && !src.getTablePrefixes().isEmpty()) {
+      filterRelevantSstFiles(fwdDAGDifferentFiles, src.getTablePrefixes());
+    }
+
     return new ArrayList<>(fwdDAGDifferentFiles);
   }
 
+  public void filterRelevantSstFiles(Set<String> inputFiles,
+      Map<String, String> tableToPrefixMap) {
+    for (String filename : inputFiles) {
+      String filepath = getAbsoluteSstFilePath(filename);
+      try (SstFileReader sstFileReader = new SstFileReader(new Options())) {
+        sstFileReader.open(filepath);
+        TableProperties properties = sstFileReader.getTableProperties();
+        String tableName = new String(properties.getColumnFamilyName(), UTF_8);
+        if (tableToPrefixMap.containsKey(tableName)) {
+          String prefix = tableToPrefixMap.get(tableName);
+          SstFileReaderIterator iterator =
+              sstFileReader.newIterator(new ReadOptions());
+          iterator.seekToFirst();
+          String firstKey = RocksDiffUtils
+              .constructBucketKey(new String(iterator.key(), UTF_8));
+          iterator.seekToLast();
+          String lastKey = RocksDiffUtils
+              .constructBucketKey(new String(iterator.key(), UTF_8));
+          if (!RocksDiffUtils
+              .isKeyWithPrefixPresent(prefix, firstKey, lastKey)) {
+            inputFiles.remove(filename);
+          }
+        } else {
+          // entry from other tables
+          inputFiles.remove(filename);
+        }
+      } catch (RocksDBException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
   /**
    * Core getSSTDiffList logic.
    */
@@ -1001,4 +1049,5 @@ public class RocksDBCheckpointDiffer {
   public static Logger getLog() {
     return LOG;
   }
+
 }
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java
new file mode 100644
index 0000000000..d6d25bbede
--- /dev/null
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ozone.rocksdiff;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+
+/**
+ * Helper methods for snap-diff operations.
+ */
+public final class RocksDiffUtils {
+
+  private RocksDiffUtils() {
+  }
+
+  public static boolean isKeyWithPrefixPresent(String prefixForColumnFamily,
+      String firstDbKey, String lastDbKey) {
+    return firstDbKey.compareTo(prefixForColumnFamily) <= 0
+        && prefixForColumnFamily.compareTo(lastDbKey) <= 0;
+  }
+
+  public static String constructBucketKey(String keyName) {
+    if (!keyName.startsWith(OzoneConsts.OM_KEY_PREFIX)) {
+      keyName = OzoneConsts.OM_KEY_PREFIX.concat(keyName);
+    }
+    String[] elements = keyName.split(OzoneConsts.OM_KEY_PREFIX);
+    String volume = elements[1];
+    String bucket = elements[2];
+    StringBuilder builder =
+        new StringBuilder().append(OzoneConsts.OM_KEY_PREFIX).append(volume);
+
+    if (StringUtils.isNotBlank(bucket)) {
+      builder.append(OzoneConsts.OM_KEY_PREFIX).append(bucket);
+    }
+    return builder.toString();
+  }
+
+
+}
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
index 9a4c5c10b0..aa19220042 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
@@ -211,7 +211,7 @@ public class TestRocksDBCheckpointDiffer {
     createCheckPoint(TEST_DB_PATH, cpPath, rocksDB);
     final String snapshotId = "snap_id_" + snapshotGeneration;
     final DifferSnapshotInfo currentSnapshot =
-        new DifferSnapshotInfo(cpPath, snapshotId, snapshotGeneration);
+        new DifferSnapshotInfo(cpPath, snapshotId, snapshotGeneration, null);
     this.snapshots.add(currentSnapshot);
 
     // Same as what OmSnapshotManager#createOmSnapshotCheckpoint would do
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index ef94c0359d..105eb67fc6 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -320,6 +320,16 @@ public final class OMConfigKeys {
       "ozone.path.deleting.limit.per.task";
   public static final int OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT = 10000;
 
+  public static final String SNAPSHOT_SST_DELETING_LIMIT_PER_TASK =
+      "ozone.snapshot.filtering.limit.per.task";
+  public static final int SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT = 2;
+
+  public static final String OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL =
+      "ozone.snapshot.filtering.service.interval";
+  public static final String
+      OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT = "60s";
+
+
   public static final String OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH =
       "ozone.om.grpc.maximum.response.length";
   /** Default value for GRPC_MAXIMUM_RESPONSE_LENGTH. */
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
index d90a1c6e03..de02528e8a 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMStorage;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
@@ -49,7 +50,9 @@ import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
@@ -136,7 +139,24 @@ public class TestOMSnapshotDAG {
     // Use RocksDB transaction sequence number in SnapshotInfo, which is
     // persisted at the time of snapshot creation, as the snapshot generation
     return new DifferSnapshotInfo(checkpointPath, snapshotInfo.getSnapshotID(),
-        snapshotInfo.getDbTxSequenceNumber());
+        snapshotInfo.getDbTxSequenceNumber(),
+        getTablePrefixes(omMetadataManager, volumeName, bucketName));
+  }
+
+  private Map<String, String> getTablePrefixes(
+      OMMetadataManager omMetadataManager, String volumeName, String 
bucketName)
+      throws IOException {
+    HashMap<String, String> tablePrefixes = new HashMap<>();
+    String volumeId = 
String.valueOf(omMetadataManager.getVolumeId(volumeName));
+    String bucketId =
+        String.valueOf(omMetadataManager.getBucketId(volumeName, bucketName));
+    tablePrefixes.put(OmMetadataManagerImpl.KEY_TABLE,
+        OM_KEY_PREFIX + volumeName + OM_KEY_PREFIX + bucketName);
+    tablePrefixes.put(OmMetadataManagerImpl.FILE_TABLE,
+        OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId);
+    tablePrefixes.put(OmMetadataManagerImpl.DIRECTORY_TABLE,
+        OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId);
+    return tablePrefixes;
   }
 
   @Test
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index 8bcffa60d2..c0ab7585a4 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -231,4 +231,10 @@ public interface KeyManager extends OzoneManagerFS, 
IOzoneAcl {
    * @return Background service.
    */
   BackgroundService getOpenKeyCleanupService();
+
+  /**
+   * Returns the instance of Snapshot SST Filtering service.
+   * @return Background service.
+   */
+  BackgroundService getSnapshotSstFilteringService();
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index a2de68dcc2..e30a3ba280 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -110,6 +110,8 @@ import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_LIST_TRASH_KE
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_LIST_TRASH_KEYS_MAX_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT;
@@ -117,6 +119,8 @@ import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT;
 import static org.apache.hadoop.ozone.om.OzoneManagerUtils.getBucketLayout;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
@@ -138,6 +142,7 @@ import org.slf4j.LoggerFactory;
 public class KeyManagerImpl implements KeyManager {
   private static final Logger LOG =
       LoggerFactory.getLogger(KeyManagerImpl.class);
+  public static final int DISABLE_VALUE = -1;
 
   /**
    * A SCM block client, used to talk to SCM to allocate block during putKey.
@@ -152,6 +157,8 @@ public class KeyManagerImpl implements KeyManager {
 
   private BackgroundService keyDeletingService;
 
+  private BackgroundService snapshotSstFilteringService;
+
   private final KeyProviderCryptoExtension kmsProvider;
   private final boolean enableFileSystemPaths;
   private BackgroundService dirDeletingService;
@@ -236,6 +243,23 @@ public class KeyManagerImpl implements KeyManager {
           TimeUnit.MILLISECONDS, serviceTimeout, ozoneManager, configuration);
       openKeyCleanupService.start();
     }
+
+    if (snapshotSstFilteringService == null) {
+      long serviceInterval = configuration.getTimeDuration(
+          OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL,
+          OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT,
+          TimeUnit.MILLISECONDS);
+      long serviceTimeout = configuration.getTimeDuration(
+          OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT,
+          OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT_DEFAULT,
+          TimeUnit.MILLISECONDS);
+      if (serviceInterval != DISABLE_VALUE) {
+        snapshotSstFilteringService =
+            new SstFilteringService(serviceInterval, TimeUnit.MILLISECONDS,
+                serviceTimeout, ozoneManager, configuration);
+        snapshotSstFilteringService.start();
+      }
+    }
   }
 
   KeyProviderCryptoExtension getKMSProvider() {
@@ -256,6 +280,10 @@ public class KeyManagerImpl implements KeyManager {
       openKeyCleanupService.shutdown();
       openKeyCleanupService = null;
     }
+    if (snapshotSstFilteringService != null) {
+      snapshotSstFilteringService.shutdown();
+      snapshotSstFilteringService = null;
+    }
   }
 
   private OmBucketInfo getBucketInfo(String volumeName, String bucketName)
@@ -570,6 +598,10 @@ public class KeyManagerImpl implements KeyManager {
     return openKeyCleanupService;
   }
 
+  public BackgroundService getSnapshotSstFilteringService() {
+    return snapshotSstFilteringService;
+  }
+
   @Override
   public OmMultipartUploadList listMultipartUploads(String volumeName,
       String bucketName, String prefix) throws OMException {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index eb77df3782..a620d451a7 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -1417,6 +1417,15 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     return metadataManager;
   }
 
+  /**
+   * Get snapshot manager.
+   *
+   * @return Om snapshot manager.
+   */
+  public OmSnapshotManager getOmSnapshotManager() {
+    return omSnapshotManager;
+  }
+
   /**
    * Get metadata manager.
    *
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
new file mode 100644
index 0000000000..92bd08780f
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om;
+
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BooleanTriFunction;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.RocksDatabase;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.ozone.rocksdiff.RocksDiffUtils;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.FILTERED_SNAPSHOTS;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT;
+
+/**
+ * When snapshots are taken, an entire snapshot of the
+ * OM RocksDB is captured and it will contain SST files corresponding
+ * to all volumes/buckets and keys and also have data from
+ * all the tables (columnFamilies) defined in the rocksdb
+ * This is a background service which will cleanup and filter out
+ * all the irrelevant and safe to delete sst files that don't correspond
+ * to the bucket on which the snapshot was taken.
+ */
+public class SstFilteringService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SstFilteringService.class);
+
+  // Use only a single thread for SST deletion. Multiple threads would read
+  // or write to same snapshots and can send deletion requests for same sst
+  // multiple times.
+  private static final int SST_FILTERING_CORE_POOL_SIZE = 1;
+
+  private final OzoneManager ozoneManager;
+
+  // Number of files to be batched in an iteration.
+  private final long snapshotLimitPerTask;
+
+  private AtomicLong snapshotFilteredCount;
+
+  private BooleanTriFunction<String, String, String, Boolean> filterFunction =
+      (first, last, prefix) -> {
+        String firstBucketKey = RocksDiffUtils.constructBucketKey(first);
+        String lastBucketKey = RocksDiffUtils.constructBucketKey(last);
+        return RocksDiffUtils
+            .isKeyWithPrefixPresent(prefix, firstBucketKey, lastBucketKey);
+      };
+
+  public SstFilteringService(long interval, TimeUnit unit, long serviceTimeout,
+      OzoneManager ozoneManager, OzoneConfiguration configuration) {
+    super("SstFilteringService", interval, unit, SST_FILTERING_CORE_POOL_SIZE,
+        serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.snapshotLimitPerTask = configuration
+        .getLong(SNAPSHOT_SST_DELETING_LIMIT_PER_TASK,
+            SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT);
+    snapshotFilteredCount = new AtomicLong(0);
+  }
+
+  private class SstFilteringTask implements BackgroundTask {
+
+    @Override
+    public BackgroundTaskResult call() throws Exception {
+
+      Table<String, SnapshotInfo> snapshotInfoTable =
+          ozoneManager.getMetadataManager().getSnapshotInfoTable();
+      try (
+          TableIterator<String, ? extends Table.KeyValue
+              <String, SnapshotInfo>> iterator = snapshotInfoTable
+              .iterator()) {
+        iterator.seekToFirst();
+
+        long snapshotLimit = snapshotLimitPerTask;
+
+        while (iterator.hasNext() && snapshotLimit > 0) {
+          Table.KeyValue<String, SnapshotInfo> keyValue = iterator.next();
+          String snapShotTableKey = keyValue.getKey();
+          SnapshotInfo snapshotInfo = keyValue.getValue();
+
+          File omMetadataDir =
+              OMStorage.getOmDbDir(ozoneManager.getConfiguration());
+          String snapshotDir = omMetadataDir + OM_KEY_PREFIX + OM_SNAPSHOT_DIR;
+          Path filePath =
+              Paths.get(snapshotDir + OM_KEY_PREFIX + FILTERED_SNAPSHOTS);
+
+          // If entry for the snapshotID is present in this file,
+          // it has already undergone filtering.
+          if (Files.exists(filePath)) {
+            List<String> processedSnapshotIds = Files.readAllLines(filePath);
+            if (processedSnapshotIds.contains(snapshotInfo.getSnapshotID())) {
+              continue;
+            }
+          }
+
+          LOG.debug("Processing snapshot {} to filter relevant SST Files",
+              snapShotTableKey);
+
+          List<Pair<String, String>> prefixPairs =
+              constructPrefixPairs(snapshotInfo);
+
+          String dbName = OM_DB_NAME + snapshotInfo.getCheckpointDirName();
+
+          RDBStore rdbStore = (RDBStore) OmMetadataManagerImpl
+              .loadDB(ozoneManager.getConfiguration(), new File(snapshotDir),
+                  dbName, true);
+          RocksDatabase db = rdbStore.getDb();
+          db.deleteFilesNotMatchingPrefix(prefixPairs, filterFunction);
+
+          // mark the snapshot as filtered by writing to the file
+          String content = snapshotInfo.getSnapshotID() + "\n";
+          Files.write(filePath, content.getBytes(StandardCharsets.UTF_8),
+              StandardOpenOption.CREATE, StandardOpenOption.APPEND);
+          snapshotLimit--;
+          snapshotFilteredCount.getAndIncrement();
+        }
+      } catch (RocksDBException | IOException e) {
+        LOG.error("Error during Snapshot sst filtering ", e);
+      }
+
+      // nothing to return here
+      return BackgroundTaskResult.EmptyTaskResult.newResult();
+    }
+
+    /**
+     * @param snapshotInfo
+     * @return a list of pairs (tableName,keyPrefix).
+     * @throws IOException
+     */
+    private List<Pair<String, String>> constructPrefixPairs(
+        SnapshotInfo snapshotInfo) throws IOException {
+      String volumeName = snapshotInfo.getVolumeName();
+      String bucketName = snapshotInfo.getBucketName();
+
+      long volumeId = 
ozoneManager.getMetadataManager().getVolumeId(volumeName);
+      // TODO : HDDS-6984  buckets can be deleted via ofs
+      //  handle deletion of bucket case.
+      long bucketId =
+          ozoneManager.getMetadataManager().getBucketId(volumeName, 
bucketName);
+
+      String filterPrefix =
+          OM_KEY_PREFIX + volumeName + OM_KEY_PREFIX + bucketName;
+
+      String filterPrefixFSO =
+          OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId;
+
+      List<Pair<String, String>> prefixPairs = new ArrayList<>();
+      prefixPairs
+          .add(Pair.of(OmMetadataManagerImpl.KEY_TABLE, filterPrefix));
+      prefixPairs.add(
+          Pair.of(OmMetadataManagerImpl.DIRECTORY_TABLE, filterPrefixFSO));
+      prefixPairs
+          .add(Pair.of(OmMetadataManagerImpl.FILE_TABLE, filterPrefixFSO));
+      return prefixPairs;
+    }
+  }
+
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new SstFilteringTask());
+    return queue;
+  }
+
+  public AtomicLong getSnapshotFilteredCount() {
+    return snapshotFilteredCount;
+  }
+
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
index 4089a07095..dfd519b26f 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
@@ -228,9 +228,11 @@ public class TestKeyDeletingService {
     // Create 2 versions of the same key
     String keyName = String.format("key%s",
         RandomStringUtils.randomAlphanumeric(5));
-    OmKeyArgs keyArgs = createAndCommitKey(keyManager, volumeName, bucketName,
-        keyName, 1);
-    createAndCommitKey(keyManager, volumeName, bucketName, keyName, 2);
+    OmKeyArgs keyArgs =
+        createAndCommitKey(writeClient, keyManager, volumeName, bucketName,
+            keyName, 1);
+    createAndCommitKey(writeClient, keyManager, volumeName, bucketName, 
keyName,
+        2);
 
     // Delete the key
     writeClient.deleteKey(keyArgs);
@@ -266,15 +268,16 @@ public class TestKeyDeletingService {
       createVolumeAndBucket(keyManager, volumeName, bucketName, false);
 
       // Create the key
-      OmKeyArgs keyArg = createAndCommitKey(keyManager, volumeName, bucketName,
-          keyName, numBlocks);
+      OmKeyArgs keyArg =
+          createAndCommitKey(writeClient, keyManager, volumeName, bucketName,
+              keyName, numBlocks);
 
       // Delete the key
       writeClient.deleteKey(keyArg);
     }
   }
 
-  private void createVolumeAndBucket(KeyManager keyManager, String volumeName,
+  static void createVolumeAndBucket(KeyManager keyManager, String volumeName,
       String bucketName, boolean isVersioningEnabled) throws IOException {
     // cheat here, just create a volume and bucket entry so that we can
     // create the keys, we put the same data for key and value since the
@@ -293,7 +296,8 @@ public class TestKeyDeletingService {
             .build());
   }
 
-  private OmKeyArgs createAndCommitKey(KeyManager keyManager, String 
volumeName,
+  static OmKeyArgs createAndCommitKey(OzoneManagerProtocol writeClient,
+      KeyManager keyManager, String volumeName,
       String bucketName, String keyName, int numBlocks) throws IOException {
     OmKeyArgs keyArg =
         new OmKeyArgs.Builder()
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java
new file mode 100644
index 0000000000..35dc07a9ad
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
+import org.apache.hadoop.hdds.utils.db.DBProfile;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.util.ExitUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.LiveFileMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
+
+/**
+ * Test SST Filtering Service.
+ */
+public class TestSstFilteringService {
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+  private OzoneManagerProtocol writeClient;
+  private OzoneManager om;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestSstFilteringService.class);
+
+  @BeforeClass
+  public static void setup() {
+    ExitUtils.disableSystemExit();
+  }
+
+  private OzoneConfiguration createConfAndInitValues() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    File newFolder = folder.newFolder();
+    if (!newFolder.exists()) {
+      Assert.assertTrue(newFolder.mkdirs());
+    }
+    System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
+    ServerUtils.setOzoneMetaDirPath(conf, newFolder.toString());
+    conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
+        TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, 100,
+        TimeUnit.MILLISECONDS);
+    conf.setEnum(HDDS_DB_PROFILE, DBProfile.TEST);
+    conf.setQuietMode(false);
+
+    return conf;
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    om.stop();
+  }
+
+  /**
+   * Test checks whether for existing snapshots
+   * the checkpoint should not have any sst files that do not correspond to
+   * the bucket on which create snapshot command was issued.
+   *
+   * The SSTFiltering service deletes only the last level of
+   * sst file (rocksdb behaviour).
+   *
+   * 1. Create Keys for vol1/buck1 (L0 ssts will be created for vol1/buck1)
+   * 2. compact the db (new level SSTS will be created for vol1/buck1)
+   * 3. Create keys for vol1/buck2 (L0 ssts will be created for vol1/buck2)
+   * 4. Take snapshot on vol1/buck2.
+   * 5. The snapshot will contain compacted sst files pertaining to vol1/buck1
+   *    Wait till the BG service deletes these.
+   *
+   * @throws IOException - on Failure.
+   */
+
+  @Test
+  public void testIrrelevantSstFileDeletion()
+      throws IOException, TimeoutException, InterruptedException,
+      AuthenticationException {
+    OzoneConfiguration conf = createConfAndInitValues();
+    OmTestManagers omTestManagers = new OmTestManagers(conf);
+    KeyManager keyManager = omTestManagers.getKeyManager();
+    writeClient = omTestManagers.getWriteClient();
+    om = omTestManagers.getOzoneManager();
+    RDBStore store = (RDBStore) om.getMetadataManager().getStore();
+
+    final int keyCount = 100;
+    createKeys(keyManager, "vol1", "buck1", keyCount / 2, 1);
+    SstFilteringService sstFilteringService =
+        (SstFilteringService) keyManager.getSnapshotSstFilteringService();
+
+    String rocksDbDir = om.getRocksDbDirectory();
+
+    store.getDb().flush(OmMetadataManagerImpl.KEY_TABLE);
+
+    createKeys(keyManager, "vol1", "buck1", keyCount / 2, 1);
+    store.getDb().flush(OmMetadataManagerImpl.KEY_TABLE);
+
+    int level0FilesCount = 0;
+    int totalFileCount = 0;
+
+    List<LiveFileMetaData> initialsstFileList = store.getDb().getSstFileList();
+    for (LiveFileMetaData fileMetaData : initialsstFileList) {
+      totalFileCount++;
+      if (fileMetaData.level() == 0) {
+        level0FilesCount++;
+      }
+    }
+    LOG.debug("Total files : {}", totalFileCount);
+    LOG.debug("Total L0 files: {}", level0FilesCount);
+
+    Assert.assertEquals(totalFileCount, level0FilesCount);
+
+    store.getDb().compactRange(OmMetadataManagerImpl.KEY_TABLE);
+
+    int level0FilesCountAfterCompact = 0;
+    int totalFileCountAfterCompact = 0;
+    int nonlevel0FilesCountAfterCompact = 0;
+    List<LiveFileMetaData> nonlevelOFiles = new ArrayList<>();
+
+    for (LiveFileMetaData fileMetaData : store.getDb().getSstFileList()) {
+      totalFileCountAfterCompact++;
+      if (fileMetaData.level() == 0) {
+        level0FilesCountAfterCompact++;
+      } else {
+        nonlevel0FilesCountAfterCompact++;
+        nonlevelOFiles.add(fileMetaData);
+      }
+    }
+
+    LOG.debug("Total files : {}", totalFileCountAfterCompact);
+    LOG.debug("Total L0 files: {}", level0FilesCountAfterCompact);
+    LOG.debug("Total non L0/compacted files: {}",
+        nonlevel0FilesCountAfterCompact);
+
+    Assert.assertTrue(nonlevel0FilesCountAfterCompact > 0);
+
+    createKeys(keyManager, "vol1", "buck2", keyCount, 1);
+
+    store.getDb().flush(OmMetadataManagerImpl.KEY_TABLE);
+
+    List<LiveFileMetaData> allFiles = store.getDb().getSstFileList();
+
+    writeClient.createSnapshot("vol1", "buck2", "snapshot1");
+
+    GenericTestUtils.waitFor(
+        () -> sstFilteringService.getSnapshotFilteredCount().get() >= 1, 1000,
+        10000);
+
+    Assert
+        .assertEquals(1, sstFilteringService.getSnapshotFilteredCount().get());
+
+    SnapshotInfo snapshotInfo = om.getMetadataManager().getSnapshotInfoTable()
+        .get(SnapshotInfo.getTableKey("vol1", "buck2", "snapshot1"));
+
+    String dbSnapshots = rocksDbDir + OM_KEY_PREFIX + OM_SNAPSHOT_DIR;
+    String snapshotDirName =
+        dbSnapshots + OM_KEY_PREFIX + OM_DB_NAME + snapshotInfo
+            .getCheckpointDirName();
+
+    for (LiveFileMetaData file : allFiles) {
+      File sstFile =
+          new File(snapshotDirName + OM_KEY_PREFIX + file.fileName());
+      if (nonlevelOFiles.stream()
+          .anyMatch(o -> file.fileName().equals(o.fileName()))) {
+        Assert.assertFalse(sstFile.exists());
+      } else {
+        Assert.assertTrue(sstFile.exists());
+      }
+    }
+
+    List<String> processedSnapshotIds = Files
+        .readAllLines(Paths.get(dbSnapshots, OzoneConsts.FILTERED_SNAPSHOTS));
+    Assert.assertTrue(
+        processedSnapshotIds.contains(snapshotInfo.getSnapshotID()));
+
+  }
+
+  @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT")
+  private void createKeys(KeyManager keyManager, String volumeName,
+      String bucketName, int keyCount, int numBlocks) throws IOException {
+    for (int x = 0; x < keyCount; x++) {
+      String keyName =
+          String.format("key%s", RandomStringUtils.randomAlphanumeric(5));
+      // Create Volume and Bucket
+      TestKeyDeletingService
+          .createVolumeAndBucket(keyManager, volumeName, bucketName, false);
+
+      // Create the key
+      TestKeyDeletingService
+          .createAndCommitKey(writeClient, keyManager, volumeName, bucketName,
+              keyName, numBlocks);
+    }
+  }
+
+}
diff --git a/pom.xml b/pom.xml
index 14d1a473fe..c5b4a75574 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1836,6 +1836,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xs
                       <allowedImport>org.rocksdb.StatsLevel</allowedImport>
                       
<allowedImport>org.rocksdb.TransactionLogIterator.BatchResult</allowedImport>
                       <allowedImport>org.rocksdb.TickerType</allowedImport>
+                      
<allowedImport>org.rocksdb.LiveFileMetaData</allowedImport>
 
                       <!-- Allow RocksObjects whose native pointer is managed 
by RocksDB. -->
                       
<allowedImport>org.rocksdb.ColumnFamilyHandle</allowedImport>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to