This is an automated email from the ASF dual-hosted git repository.
sshenoy pushed a commit to branch HDDS-6517-Snapshot
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-6517-Snapshot by this
push:
new f77dfa6f59 HDDS-6962. [Snapshot] Background Service to delete
irrelevant SST files in a snapshot. (#3883)
f77dfa6f59 is described below
commit f77dfa6f5911dfd0e73badbc0ebbb05747337ff0
Author: Sadanand Shenoy <[email protected]>
AuthorDate: Tue Dec 6 11:05:15 2022 +0530
HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in
a snapshot. (#3883)
---
.../org/apache/hadoop/ozone/OzoneConfigKeys.java | 6 +
.../java/org/apache/hadoop/ozone/OzoneConsts.java | 2 +
.../common/src/main/resources/ozone-default.xml | 22 ++
.../hadoop/hdds/utils/BooleanTriFunction.java | 37 ++++
.../org/apache/hadoop/hdds/utils/db/DBProfile.java | 25 +++
.../org/apache/hadoop/hdds/utils/db/RDBStore.java | 1 +
.../apache/hadoop/hdds/utils/db/RocksDatabase.java | 132 ++++++++++++
hadoop-hdds/rocksdb-checkpoint-differ/pom.xml | 5 +
.../ozone/rocksdiff/RocksDBCheckpointDiffer.java | 75 +++++--
.../org/apache/ozone/rocksdiff/RocksDiffUtils.java | 54 +++++
.../rocksdiff/TestRocksDBCheckpointDiffer.java | 2 +-
.../org/apache/hadoop/ozone/om/OMConfigKeys.java | 10 +
.../hadoop/ozone/freon/TestOMSnapshotDAG.java | 22 +-
.../org/apache/hadoop/ozone/om/KeyManager.java | 6 +
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 32 +++
.../org/apache/hadoop/ozone/om/OzoneManager.java | 9 +
.../hadoop/ozone/om/SstFilteringService.java | 212 ++++++++++++++++++
.../hadoop/ozone/om/TestKeyDeletingService.java | 18 +-
.../hadoop/ozone/om/TestSstFilteringService.java | 237 +++++++++++++++++++++
pom.xml | 1 +
20 files changed, 886 insertions(+), 22 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 8ae3577ac4..4293f3ef1e 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -190,6 +190,12 @@ public final class OzoneConfigKeys {
public static final String OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT
= "300s"; // 300s for default
+ public static final String OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT =
+ "ozone.sst.filtering.service.timeout";
+ public static final String
+ OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT_DEFAULT = "300s";
+ // 300s for default
+
public static final String OZONE_BLOCK_DELETING_SERVICE_WORKERS =
"ozone.block.deleting.service.workers";
public static final int OZONE_BLOCK_DELETING_SERVICE_WORKERS_DEFAULT
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 97dd7fde0b..6b78a5c4a1 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -554,4 +554,6 @@ public final class OzoneConsts {
public static final String OM_SNAPSHOT_DIR = "db.snapshots";
public static final String OM_SNAPSHOT_INDICATOR = ".snapshot";
+ public static final String FILTERED_SNAPSHOTS = "filtered-snapshots";
+
}
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index c10344756a..788bc0e8a5 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3058,6 +3058,28 @@
directory deleting service per time interval.
</description>
</property>
+ <property>
+ <name>ozone.snapshot.filtering.limit.per.task</name>
+ <value>2</value>
+ <tag>OZONE, PERFORMANCE, OM</tag>
+ <description>A maximum number of snapshots to be filtered by
+ sst filtering service per time interval.
+ </description>
+ </property>
+ <property>
+ <name>ozone.snapshot.filtering.service.interval</name>
+ <value>1m</value>
+ <tag>OZONE, PERFORMANCE, OM</tag>
+ <description>Time interval of the SST File filtering service from Snapshot.
+ </description>
+ </property>
+ <property>
+ <name>ozone.sst.filtering.service.timeout</name>
+ <value>300000ms</value>
+ <tag>OZONE, PERFORMANCE,OM</tag>
+ <description>A timeout value of sst filtering service.
+ </description>
+ </property>
<property>
<name>ozone.scm.event.ContainerReport.thread.pool.size</name>
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BooleanTriFunction.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BooleanTriFunction.java
new file mode 100644
index 0000000000..c9ed1ee667
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BooleanTriFunction.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.utils;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+/**
+ * Defines a functional interface having three inputs and returns boolean as
+ * output.
+ */
+@FunctionalInterface
+public interface BooleanTriFunction<T, U, V, R> {
+
+ R apply(T t, U u, V v);
+
+ default <K> BooleanTriFunction<T, U, V, K> andThen(
+ Function<? super R, ? extends K> after) {
+ Objects.requireNonNull(after);
+ return (T t, U u, V v) -> after.apply(apply(t, u, v));
+ }
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBProfile.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBProfile.java
index b3dbb857ab..a0e300fb7f 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBProfile.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBProfile.java
@@ -117,6 +117,31 @@ public enum DBProfile {
return cfOptions;
}
+ @Override
+ public ManagedBlockBasedTableConfig getBlockBasedTableConfig() {
+ return SSD.getBlockBasedTableConfig();
+ }
+ },
+ TEST {
+ @Override
+ public String toString() {
+ return "TEST";
+ }
+
+ @Override
+ public ManagedDBOptions getDBOptions() {
+ ManagedDBOptions dbOptions = SSD.getDBOptions();
+ return dbOptions;
+ }
+
+ @Override
+ public ManagedColumnFamilyOptions getColumnFamilyOptions() {
+ ManagedColumnFamilyOptions cfOptions = SSD.getColumnFamilyOptions();
+ cfOptions.setCompactionStyle(CompactionStyle.LEVEL);
+ cfOptions.setDisableAutoCompactions(true);
+ return cfOptions;
+ }
+
@Override
public ManagedBlockBasedTableConfig getBlockBasedTableConfig() {
return SSD.getBlockBasedTableConfig();
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index 992627c4cf..adafe079a8 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -42,6 +42,7 @@ import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
import com.google.common.base.Preconditions;
import
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.rocksdb.RocksDBException;
+
import org.rocksdb.TransactionLogIterator.BatchResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
index 6f20dcee7b..b96ed1d768 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdds.utils.db;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.utils.BooleanTriFunction;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.utils.db.managed.ManagedCheckpoint;
import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
@@ -32,6 +35,7 @@ import
org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.Holder;
+import org.rocksdb.LiveFileMetaData;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,10 +43,12 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -68,6 +74,9 @@ public final class RocksDatabase {
static final String ESTIMATE_NUM_KEYS = "rocksdb.estimate-num-keys";
+ private static List<ColumnFamilyHandle> columnFamilyHandles =
+ new ArrayList<>();
+
static IOException toIOException(Object name, String op, RocksDBException e)
{
return HddsServerUtil.toIOException(name + ": Failed to " + op, e);
@@ -136,6 +145,7 @@ public final class RocksDatabase {
db = ManagedRocksDB.open(dbOptions, dbFile.getAbsolutePath(),
descriptors, handles);
}
+ columnFamilyHandles = handles;
// init a column family map.
for (ColumnFamilyHandle h : handles) {
final ColumnFamily f = new ColumnFamily(h);
@@ -359,6 +369,27 @@ public final class RocksDatabase {
}
}
+ /**
+ * @param cfName columnFamily on which flush will run.
+ * @throws IOException
+ */
+ public void flush(String cfName) throws IOException {
+ ColumnFamilyHandle handle = getColumnFamilyHandle(cfName);
+ try (ManagedFlushOptions options = new ManagedFlushOptions()) {
+ options.setWaitForFlush(true);
+ if (handle != null) {
+ db.get().flush(options, handle);
+ } else {
+ LOG.error("Provided column family doesn't exist."
+ + " Calling flush on null columnFamily");
+ flush();
+ }
+ } catch (RocksDBException e) {
+ closeOnError(e);
+ throw toIOException(this, "flush", e);
+ }
+ }
+
public void flushWal(boolean sync) throws IOException {
try {
db.get().flushWal(sync);
@@ -377,6 +408,41 @@ public final class RocksDatabase {
}
}
+ /**
+ * @param cfName columnFamily on which compaction will run.
+ * @throws IOException
+ */
+ public void compactRange(String cfName) throws IOException {
+ ColumnFamilyHandle handle = getColumnFamilyHandle(cfName);
+ try {
+ if (handle != null) {
+ db.get().compactRange(handle);
+ } else {
+ LOG.error("Provided column family doesn't exist."
+ + " Calling compactRange on null columnFamily");
+ db.get().compactRange();
+ }
+ } catch (RocksDBException e) {
+ closeOnError(e);
+ throw toIOException(this, "compactRange", e);
+ }
+ }
+
+ private ColumnFamilyHandle getColumnFamilyHandle(String cfName)
+ throws IOException {
+ for (ColumnFamilyHandle cf : getColumnFamilyHandles()) {
+ try {
+ if (cfName.equals(new String(cf.getName(), StandardCharsets.UTF_8))) {
+ return cf;
+ }
+ } catch (RocksDBException e) {
+ closeOnError(e);
+ throw toIOException(this, "columnFamilyHandle.getName", e);
+ }
+ }
+ return null;
+ }
+
RocksCheckpoint createCheckpoint() {
return new RocksCheckpoint();
}
@@ -523,4 +589,70 @@ public final class RocksDatabase {
return name;
}
+ @VisibleForTesting
+ public List<LiveFileMetaData> getSstFileList() {
+ return db.get().getLiveFilesMetaData();
+ }
+
+ /**
+ * return the max compaction level of sst files in the db.
+ * @return level
+ */
+ private int getLastLevel() {
+ return getSstFileList().stream()
+ .max(Comparator.comparing(LiveFileMetaData::level)).get().level();
+ }
+
+ /**
+ * Deletes sst files which do not correspond to prefix
+ * for given table.
+ * @param prefixPairs, a list of pair (TableName,prefixUsed).
+ * @throws RocksDBException
+ */
+ public void deleteFilesNotMatchingPrefix(
+ List<Pair<String, String>> prefixPairs,
+ BooleanTriFunction<String, String, String, Boolean> filterFunction)
+ throws RocksDBException {
+ for (LiveFileMetaData liveFileMetaData : getSstFileList()) {
+ String sstFileColumnFamily =
+ new String(liveFileMetaData.columnFamilyName(),
+ StandardCharsets.UTF_8);
+ int lastLevel = getLastLevel();
+ for (Pair<String, String> prefixPair : prefixPairs) {
+ String columnFamily = prefixPair.getKey();
+ String prefixForColumnFamily = prefixPair.getValue();
+ if (!sstFileColumnFamily.equals(columnFamily)) {
+ continue;
+ }
+ // RocksDB #deleteFile API allows only to delete the last level of
+ // SST Files. Any level < last level won't get deleted and
+ // only last file of level 0 can be deleted
+ // and will throw warning in the rocksdb manifest.
+ // Instead, perform the level check here
+ // itself to avoid failed delete attempts for lower level files.
+ if (liveFileMetaData.level() != lastLevel || lastLevel == 0) {
+ continue;
+ }
+ String firstDbKey =
+ new String(liveFileMetaData.smallestKey(), StandardCharsets.UTF_8);
+ String lastDbKey =
+ new String(liveFileMetaData.largestKey(), StandardCharsets.UTF_8);
+ boolean isKeyWithPrefixPresent =
+ filterFunction.apply(firstDbKey, lastDbKey, prefixForColumnFamily);
+ if (!isKeyWithPrefixPresent) {
+ String sstFileName = liveFileMetaData.fileName();
+ LOG.info("Deleting sst file {} corresponding to column family"
+ + " {} from db: {}", sstFileName,
+ liveFileMetaData.columnFamilyName(), db.get().getName());
+ db.get().deleteFile(sstFileName);
+ }
+ }
+ }
+ }
+
+ public static List<ColumnFamilyHandle> getColumnFamilyHandles() {
+ return columnFamilyHandles;
+ }
+
+
}
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
index 0c03876df0..c6af10fbc4 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
@@ -34,6 +34,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.ozone</groupId>
+ <artifactId>hdds-common</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index 99ace91c80..4ad8a0f9b2 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -29,9 +29,11 @@ import org.rocksdb.CompactionJobInfo;
import org.rocksdb.DBOptions;
import org.rocksdb.LiveFileMetaData;
import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.SstFileReader;
+import org.rocksdb.SstFileReaderIterator;
import org.rocksdb.TableProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,6 +51,7 @@ import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -455,29 +458,31 @@ public class RocksDBCheckpointDiffer {
* @return number of keys
*/
private long getSSTFileSummary(String filename) throws RocksDBException {
+ Options option = new Options();
+ SstFileReader reader = new SstFileReader(option);
+
+ reader.open(getAbsoluteSstFilePath(filename));
+
+ TableProperties properties = reader.getTableProperties();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} has {} keys", filename, properties.getNumEntries());
+ }
+ return properties.getNumEntries();
+ }
+ private String getAbsoluteSstFilePath(String filename) {
if (!filename.endsWith(SST_FILE_EXTENSION)) {
filename += SST_FILE_EXTENSION;
}
-
- Options option = new Options();
- SstFileReader reader = new SstFileReader(option);
-
File sstFile = new File(sstBackupDir + filename);
File sstFileInActiveDB = new File(activeDBLocationStr + filename);
if (sstFile.exists()) {
- reader.open(sstBackupDir + filename);
+ return sstBackupDir + filename;
} else if (sstFileInActiveDB.exists()) {
- reader.open(activeDBLocationStr + filename);
+ return activeDBLocationStr + filename;
} else {
throw new RuntimeException("Can't find SST file: " + filename);
}
-
- TableProperties properties = reader.getTableProperties();
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} has {} keys", filename, properties.getNumEntries());
- }
- return properties.getNumEntries();
}
/**
@@ -637,11 +642,14 @@ public class RocksDBCheckpointDiffer {
private final String dbPath;
private final String snapshotID;
private final long snapshotGeneration;
+ private final Map<String, String> tablePrefixes;
- public DifferSnapshotInfo(String db, String id, long gen) {
+ public DifferSnapshotInfo(String db, String id, long gen,
+ Map<String, String> prefixes) {
dbPath = db;
snapshotID = id;
snapshotGeneration = gen;
+ tablePrefixes = prefixes;
}
public String getDbPath() {
@@ -656,6 +664,10 @@ public class RocksDBCheckpointDiffer {
return snapshotGeneration;
}
+ public Map<String, String> getTablePrefixes() {
+ return tablePrefixes;
+ }
+
@Override
public String toString() {
return "DifferSnapshotInfo{" + "dbPath='" + dbPath + '\''
@@ -705,9 +717,45 @@ public class RocksDBCheckpointDiffer {
LOG.debug("{}", logSB);
}
+ if (src.getTablePrefixes() != null && !src.getTablePrefixes().isEmpty()) {
+ filterRelevantSstFiles(fwdDAGDifferentFiles, src.getTablePrefixes());
+ }
+
return new ArrayList<>(fwdDAGDifferentFiles);
}
+ public void filterRelevantSstFiles(Set<String> inputFiles,
+ Map<String, String> tableToPrefixMap) {
+ for (String filename : inputFiles) {
+ String filepath = getAbsoluteSstFilePath(filename);
+ try (SstFileReader sstFileReader = new SstFileReader(new Options())) {
+ sstFileReader.open(filepath);
+ TableProperties properties = sstFileReader.getTableProperties();
+ String tableName = new String(properties.getColumnFamilyName(), UTF_8);
+ if (tableToPrefixMap.containsKey(tableName)) {
+ String prefix = tableToPrefixMap.get(tableName);
+ SstFileReaderIterator iterator =
+ sstFileReader.newIterator(new ReadOptions());
+ iterator.seekToFirst();
+ String firstKey = RocksDiffUtils
+ .constructBucketKey(new String(iterator.key(), UTF_8));
+ iterator.seekToLast();
+ String lastKey = RocksDiffUtils
+ .constructBucketKey(new String(iterator.key(), UTF_8));
+ if (!RocksDiffUtils
+ .isKeyWithPrefixPresent(prefix, firstKey, lastKey)) {
+ inputFiles.remove(filename);
+ }
+ } else {
+ // entry from other tables
+ inputFiles.remove(filename);
+ }
+ } catch (RocksDBException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
/**
* Core getSSTDiffList logic.
*/
@@ -1001,4 +1049,5 @@ public class RocksDBCheckpointDiffer {
public static Logger getLog() {
return LOG;
}
+
}
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java
new file mode 100644
index 0000000000..d6d25bbede
--- /dev/null
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ozone.rocksdiff;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+
+/**
+ * Helper methods for snap-diff operations.
+ */
+public final class RocksDiffUtils {
+
+ private RocksDiffUtils() {
+ }
+
+ public static boolean isKeyWithPrefixPresent(String prefixForColumnFamily,
+ String firstDbKey, String lastDbKey) {
+ return firstDbKey.compareTo(prefixForColumnFamily) <= 0
+ && prefixForColumnFamily.compareTo(lastDbKey) <= 0;
+ }
+
+ public static String constructBucketKey(String keyName) {
+ if (!keyName.startsWith(OzoneConsts.OM_KEY_PREFIX)) {
+ keyName = OzoneConsts.OM_KEY_PREFIX.concat(keyName);
+ }
+ String[] elements = keyName.split(OzoneConsts.OM_KEY_PREFIX);
+ String volume = elements[1];
+ String bucket = elements[2];
+ StringBuilder builder =
+ new StringBuilder().append(OzoneConsts.OM_KEY_PREFIX).append(volume);
+
+ if (StringUtils.isNotBlank(bucket)) {
+ builder.append(OzoneConsts.OM_KEY_PREFIX).append(bucket);
+ }
+ return builder.toString();
+ }
+
+
+}
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
index 9a4c5c10b0..aa19220042 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
@@ -211,7 +211,7 @@ public class TestRocksDBCheckpointDiffer {
createCheckPoint(TEST_DB_PATH, cpPath, rocksDB);
final String snapshotId = "snap_id_" + snapshotGeneration;
final DifferSnapshotInfo currentSnapshot =
- new DifferSnapshotInfo(cpPath, snapshotId, snapshotGeneration);
+ new DifferSnapshotInfo(cpPath, snapshotId, snapshotGeneration, null);
this.snapshots.add(currentSnapshot);
// Same as what OmSnapshotManager#createOmSnapshotCheckpoint would do
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index ef94c0359d..105eb67fc6 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -320,6 +320,16 @@ public final class OMConfigKeys {
"ozone.path.deleting.limit.per.task";
public static final int OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT = 10000;
+ public static final String SNAPSHOT_SST_DELETING_LIMIT_PER_TASK =
+ "ozone.snapshot.filtering.limit.per.task";
+ public static final int SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT = 2;
+
+ public static final String OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL =
+ "ozone.snapshot.filtering.service.interval";
+ public static final String
+ OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT = "60s";
+
+
public static final String OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH =
"ozone.om.grpc.maximum.response.length";
/** Default value for GRPC_MAXIMUM_RESPONSE_LENGTH. */
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
index d90a1c6e03..de02528e8a 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OMStorage;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
@@ -49,7 +50,9 @@ import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@@ -136,7 +139,24 @@ public class TestOMSnapshotDAG {
// Use RocksDB transaction sequence number in SnapshotInfo, which is
// persisted at the time of snapshot creation, as the snapshot generation
return new DifferSnapshotInfo(checkpointPath, snapshotInfo.getSnapshotID(),
- snapshotInfo.getDbTxSequenceNumber());
+ snapshotInfo.getDbTxSequenceNumber(),
+ getTablePrefixes(omMetadataManager, volumeName, bucketName));
+ }
+
+ private Map<String, String> getTablePrefixes(
+ OMMetadataManager omMetadataManager, String volumeName, String
bucketName)
+ throws IOException {
+ HashMap<String, String> tablePrefixes = new HashMap<>();
+ String volumeId =
String.valueOf(omMetadataManager.getVolumeId(volumeName));
+ String bucketId =
+ String.valueOf(omMetadataManager.getBucketId(volumeName, bucketName));
+ tablePrefixes.put(OmMetadataManagerImpl.KEY_TABLE,
+ OM_KEY_PREFIX + volumeName + OM_KEY_PREFIX + bucketName);
+ tablePrefixes.put(OmMetadataManagerImpl.FILE_TABLE,
+ OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId);
+ tablePrefixes.put(OmMetadataManagerImpl.DIRECTORY_TABLE,
+ OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId);
+ return tablePrefixes;
}
@Test
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index 8bcffa60d2..c0ab7585a4 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -231,4 +231,10 @@ public interface KeyManager extends OzoneManagerFS,
IOzoneAcl {
* @return Background service.
*/
BackgroundService getOpenKeyCleanupService();
+
+ /**
+ * Returns the instance of Snapshot SST Filtering service.
+ * @return Background service.
+ */
+ BackgroundService getSnapshotSstFilteringService();
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index a2de68dcc2..e30a3ba280 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -110,6 +110,8 @@ import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_LIST_TRASH_KE
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_LIST_TRASH_KEYS_MAX_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT;
@@ -117,6 +119,8 @@ import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_DEFAULT;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT_DEFAULT;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.om.OzoneManagerUtils.getBucketLayout;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
@@ -138,6 +142,7 @@ import org.slf4j.LoggerFactory;
public class KeyManagerImpl implements KeyManager {
private static final Logger LOG =
LoggerFactory.getLogger(KeyManagerImpl.class);
+ public static final int DISABLE_VALUE = -1;
/**
* A SCM block client, used to talk to SCM to allocate block during putKey.
@@ -152,6 +157,8 @@ public class KeyManagerImpl implements KeyManager {
private BackgroundService keyDeletingService;
+ private BackgroundService snapshotSstFilteringService;
+
private final KeyProviderCryptoExtension kmsProvider;
private final boolean enableFileSystemPaths;
private BackgroundService dirDeletingService;
@@ -236,6 +243,23 @@ public class KeyManagerImpl implements KeyManager {
TimeUnit.MILLISECONDS, serviceTimeout, ozoneManager, configuration);
openKeyCleanupService.start();
}
+
+ if (snapshotSstFilteringService == null) {
+ long serviceInterval = configuration.getTimeDuration(
+ OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL,
+ OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ long serviceTimeout = configuration.getTimeDuration(
+ OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT,
+ OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ if (serviceInterval != DISABLE_VALUE) {
+ snapshotSstFilteringService =
+ new SstFilteringService(serviceInterval, TimeUnit.MILLISECONDS,
+ serviceTimeout, ozoneManager, configuration);
+ snapshotSstFilteringService.start();
+ }
+ }
}
KeyProviderCryptoExtension getKMSProvider() {
@@ -256,6 +280,10 @@ public class KeyManagerImpl implements KeyManager {
openKeyCleanupService.shutdown();
openKeyCleanupService = null;
}
+ if (snapshotSstFilteringService != null) {
+ snapshotSstFilteringService.shutdown();
+ snapshotSstFilteringService = null;
+ }
}
private OmBucketInfo getBucketInfo(String volumeName, String bucketName)
@@ -570,6 +598,10 @@ public class KeyManagerImpl implements KeyManager {
return openKeyCleanupService;
}
+ public BackgroundService getSnapshotSstFilteringService() {
+ return snapshotSstFilteringService;
+ }
+
@Override
public OmMultipartUploadList listMultipartUploads(String volumeName,
String bucketName, String prefix) throws OMException {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index eb77df3782..a620d451a7 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -1417,6 +1417,15 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
return metadataManager;
}
+ /**
+ * Get snapshot manager.
+ *
+ * @return Om snapshot manager.
+ */
+ public OmSnapshotManager getOmSnapshotManager() {
+ return omSnapshotManager;
+ }
+
/**
* Get metadata manager.
*
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
new file mode 100644
index 0000000000..92bd08780f
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om;
+
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BooleanTriFunction;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.RocksDatabase;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.ozone.rocksdiff.RocksDiffUtils;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.FILTERED_SNAPSHOTS;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT;
+
+/**
+ * When snapshots are taken, an entire snapshot of the
+ * OM RocksDB is captured and it will contain SST files corresponding
+ * to all volumes/buckets and keys and also have data from
+ * all the tables (columnFamilies) defined in the rocksdb
+ * This is a background service which will cleanup and filter out
+ * all the irrelevant and safe to delete sst files that don't correspond
+ * to the bucket on which the snapshot was taken.
+ */
+public class SstFilteringService extends BackgroundService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SstFilteringService.class);
+
+ // Use only a single thread for SST deletion. Multiple threads would read
+ // or write to same snapshots and can send deletion requests for same sst
+ // multiple times.
+ private static final int SST_FILTERING_CORE_POOL_SIZE = 1;
+
+ private final OzoneManager ozoneManager;
+
+ // Number of files to be batched in an iteration.
+ private final long snapshotLimitPerTask;
+
+ private AtomicLong snapshotFilteredCount;
+
+ private BooleanTriFunction<String, String, String, Boolean> filterFunction =
+ (first, last, prefix) -> {
+ String firstBucketKey = RocksDiffUtils.constructBucketKey(first);
+ String lastBucketKey = RocksDiffUtils.constructBucketKey(last);
+ return RocksDiffUtils
+ .isKeyWithPrefixPresent(prefix, firstBucketKey, lastBucketKey);
+ };
+
+ public SstFilteringService(long interval, TimeUnit unit, long serviceTimeout,
+ OzoneManager ozoneManager, OzoneConfiguration configuration) {
+ super("SstFilteringService", interval, unit, SST_FILTERING_CORE_POOL_SIZE,
+ serviceTimeout);
+ this.ozoneManager = ozoneManager;
+ this.snapshotLimitPerTask = configuration
+ .getLong(SNAPSHOT_SST_DELETING_LIMIT_PER_TASK,
+ SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT);
+ snapshotFilteredCount = new AtomicLong(0);
+ }
+
+ private class SstFilteringTask implements BackgroundTask {
+
+ @Override
+ public BackgroundTaskResult call() throws Exception {
+
+ Table<String, SnapshotInfo> snapshotInfoTable =
+ ozoneManager.getMetadataManager().getSnapshotInfoTable();
+ try (
+ TableIterator<String, ? extends Table.KeyValue
+ <String, SnapshotInfo>> iterator = snapshotInfoTable
+ .iterator()) {
+ iterator.seekToFirst();
+
+ long snapshotLimit = snapshotLimitPerTask;
+
+ while (iterator.hasNext() && snapshotLimit > 0) {
+ Table.KeyValue<String, SnapshotInfo> keyValue = iterator.next();
+ String snapShotTableKey = keyValue.getKey();
+ SnapshotInfo snapshotInfo = keyValue.getValue();
+
+ File omMetadataDir =
+ OMStorage.getOmDbDir(ozoneManager.getConfiguration());
+ String snapshotDir = omMetadataDir + OM_KEY_PREFIX + OM_SNAPSHOT_DIR;
+ Path filePath =
+ Paths.get(snapshotDir + OM_KEY_PREFIX + FILTERED_SNAPSHOTS);
+
+ // If entry for the snapshotID is present in this file,
+ // it has already undergone filtering.
+ if (Files.exists(filePath)) {
+ List<String> processedSnapshotIds = Files.readAllLines(filePath);
+ if (processedSnapshotIds.contains(snapshotInfo.getSnapshotID())) {
+ continue;
+ }
+ }
+
+ LOG.debug("Processing snapshot {} to filter relevant SST Files",
+ snapShotTableKey);
+
+ List<Pair<String, String>> prefixPairs =
+ constructPrefixPairs(snapshotInfo);
+
+ String dbName = OM_DB_NAME + snapshotInfo.getCheckpointDirName();
+
+ RDBStore rdbStore = (RDBStore) OmMetadataManagerImpl
+ .loadDB(ozoneManager.getConfiguration(), new File(snapshotDir),
+ dbName, true);
+ RocksDatabase db = rdbStore.getDb();
+ db.deleteFilesNotMatchingPrefix(prefixPairs, filterFunction);
+
+ // mark the snapshot as filtered by writing to the file
+ String content = snapshotInfo.getSnapshotID() + "\n";
+ Files.write(filePath, content.getBytes(StandardCharsets.UTF_8),
+ StandardOpenOption.CREATE, StandardOpenOption.APPEND);
+ snapshotLimit--;
+ snapshotFilteredCount.getAndIncrement();
+ }
+ } catch (RocksDBException | IOException e) {
+ LOG.error("Error during Snapshot sst filtering ", e);
+ }
+
+ // nothing to return here
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+
+ /**
+ * @param snapshotInfo
+ * @return a list of pairs (tableName,keyPrefix).
+ * @throws IOException
+ */
+ private List<Pair<String, String>> constructPrefixPairs(
+ SnapshotInfo snapshotInfo) throws IOException {
+ String volumeName = snapshotInfo.getVolumeName();
+ String bucketName = snapshotInfo.getBucketName();
+
+ long volumeId =
ozoneManager.getMetadataManager().getVolumeId(volumeName);
+ // TODO : HDDS-6984 buckets can be deleted via ofs
+ // handle deletion of bucket case.
+ long bucketId =
+ ozoneManager.getMetadataManager().getBucketId(volumeName,
bucketName);
+
+ String filterPrefix =
+ OM_KEY_PREFIX + volumeName + OM_KEY_PREFIX + bucketName;
+
+ String filterPrefixFSO =
+ OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId;
+
+ List<Pair<String, String>> prefixPairs = new ArrayList<>();
+ prefixPairs
+ .add(Pair.of(OmMetadataManagerImpl.KEY_TABLE, filterPrefix));
+ prefixPairs.add(
+ Pair.of(OmMetadataManagerImpl.DIRECTORY_TABLE, filterPrefixFSO));
+ prefixPairs
+ .add(Pair.of(OmMetadataManagerImpl.FILE_TABLE, filterPrefixFSO));
+ return prefixPairs;
+ }
+ }
+
+
+ @Override
+ public BackgroundTaskQueue getTasks() {
+ BackgroundTaskQueue queue = new BackgroundTaskQueue();
+ queue.add(new SstFilteringTask());
+ return queue;
+ }
+
+ public AtomicLong getSnapshotFilteredCount() {
+ return snapshotFilteredCount;
+ }
+
+}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
index 4089a07095..dfd519b26f 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
@@ -228,9 +228,11 @@ public class TestKeyDeletingService {
// Create 2 versions of the same key
String keyName = String.format("key%s",
RandomStringUtils.randomAlphanumeric(5));
- OmKeyArgs keyArgs = createAndCommitKey(keyManager, volumeName, bucketName,
- keyName, 1);
- createAndCommitKey(keyManager, volumeName, bucketName, keyName, 2);
+ OmKeyArgs keyArgs =
+ createAndCommitKey(writeClient, keyManager, volumeName, bucketName,
+ keyName, 1);
+ createAndCommitKey(writeClient, keyManager, volumeName, bucketName,
keyName,
+ 2);
// Delete the key
writeClient.deleteKey(keyArgs);
@@ -266,15 +268,16 @@ public class TestKeyDeletingService {
createVolumeAndBucket(keyManager, volumeName, bucketName, false);
// Create the key
- OmKeyArgs keyArg = createAndCommitKey(keyManager, volumeName, bucketName,
- keyName, numBlocks);
+ OmKeyArgs keyArg =
+ createAndCommitKey(writeClient, keyManager, volumeName, bucketName,
+ keyName, numBlocks);
// Delete the key
writeClient.deleteKey(keyArg);
}
}
- private void createVolumeAndBucket(KeyManager keyManager, String volumeName,
+ static void createVolumeAndBucket(KeyManager keyManager, String volumeName,
String bucketName, boolean isVersioningEnabled) throws IOException {
// cheat here, just create a volume and bucket entry so that we can
// create the keys, we put the same data for key and value since the
@@ -293,7 +296,8 @@ public class TestKeyDeletingService {
.build());
}
- private OmKeyArgs createAndCommitKey(KeyManager keyManager, String
volumeName,
+ static OmKeyArgs createAndCommitKey(OzoneManagerProtocol writeClient,
+ KeyManager keyManager, String volumeName,
String bucketName, String keyName, int numBlocks) throws IOException {
OmKeyArgs keyArg =
new OmKeyArgs.Builder()
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java
new file mode 100644
index 0000000000..35dc07a9ad
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
+import org.apache.hadoop.hdds.utils.db.DBProfile;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import
org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.util.ExitUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.LiveFileMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
+
+/**
+ * Test SST Filtering Service.
+ */
+public class TestSstFilteringService {
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+ private OzoneManagerProtocol writeClient;
+ private OzoneManager om;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestSstFilteringService.class);
+
+ @BeforeClass
+ public static void setup() {
+ ExitUtils.disableSystemExit();
+ }
+
+ private OzoneConfiguration createConfAndInitValues() throws IOException {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ File newFolder = folder.newFolder();
+ if (!newFolder.exists()) {
+ Assert.assertTrue(newFolder.mkdirs());
+ }
+ System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
+ ServerUtils.setOzoneMetaDirPath(conf, newFolder.toString());
+ conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
+ TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, 100,
+ TimeUnit.MILLISECONDS);
+ conf.setEnum(HDDS_DB_PROFILE, DBProfile.TEST);
+ conf.setQuietMode(false);
+
+ return conf;
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ om.stop();
+ }
+
+ /**
+ * Test checks whether for existing snapshots
+ * the checkpoint should not have any sst files that do not correspond to
+ * the bucket on which create snapshot command was issued.
+ *
+ * The SSTFiltering service deletes only the last level of
+ * sst file (rocksdb behaviour).
+ *
+ * 1. Create Keys for vol1/buck1 (L0 ssts will be created for vol1/buck1)
+ * 2. compact the db (new level SSTS will be created for vol1/buck1)
+ * 3. Create keys for vol1/buck2 (L0 ssts will be created for vol1/buck2)
+ * 4. Take snapshot on vol1/buck2.
+ * 5. The snapshot will contain compacted sst files pertaining to vol1/buck1
+ * Wait till the BG service deletes these.
+ *
+ * @throws IOException - on Failure.
+ */
+
+ @Test
+ public void testIrrelevantSstFileDeletion()
+ throws IOException, TimeoutException, InterruptedException,
+ AuthenticationException {
+ OzoneConfiguration conf = createConfAndInitValues();
+ OmTestManagers omTestManagers = new OmTestManagers(conf);
+ KeyManager keyManager = omTestManagers.getKeyManager();
+ writeClient = omTestManagers.getWriteClient();
+ om = omTestManagers.getOzoneManager();
+ RDBStore store = (RDBStore) om.getMetadataManager().getStore();
+
+ final int keyCount = 100;
+ createKeys(keyManager, "vol1", "buck1", keyCount / 2, 1);
+ SstFilteringService sstFilteringService =
+ (SstFilteringService) keyManager.getSnapshotSstFilteringService();
+
+ String rocksDbDir = om.getRocksDbDirectory();
+
+ store.getDb().flush(OmMetadataManagerImpl.KEY_TABLE);
+
+ createKeys(keyManager, "vol1", "buck1", keyCount / 2, 1);
+ store.getDb().flush(OmMetadataManagerImpl.KEY_TABLE);
+
+ int level0FilesCount = 0;
+ int totalFileCount = 0;
+
+ List<LiveFileMetaData> initialsstFileList = store.getDb().getSstFileList();
+ for (LiveFileMetaData fileMetaData : initialsstFileList) {
+ totalFileCount++;
+ if (fileMetaData.level() == 0) {
+ level0FilesCount++;
+ }
+ }
+ LOG.debug("Total files : {}", totalFileCount);
+ LOG.debug("Total L0 files: {}", level0FilesCount);
+
+ Assert.assertEquals(totalFileCount, level0FilesCount);
+
+ store.getDb().compactRange(OmMetadataManagerImpl.KEY_TABLE);
+
+ int level0FilesCountAfterCompact = 0;
+ int totalFileCountAfterCompact = 0;
+ int nonlevel0FilesCountAfterCompact = 0;
+ List<LiveFileMetaData> nonlevelOFiles = new ArrayList<>();
+
+ for (LiveFileMetaData fileMetaData : store.getDb().getSstFileList()) {
+ totalFileCountAfterCompact++;
+ if (fileMetaData.level() == 0) {
+ level0FilesCountAfterCompact++;
+ } else {
+ nonlevel0FilesCountAfterCompact++;
+ nonlevelOFiles.add(fileMetaData);
+ }
+ }
+
+ LOG.debug("Total files : {}", totalFileCountAfterCompact);
+ LOG.debug("Total L0 files: {}", level0FilesCountAfterCompact);
+ LOG.debug("Total non L0/compacted files: {}",
+ nonlevel0FilesCountAfterCompact);
+
+ Assert.assertTrue(nonlevel0FilesCountAfterCompact > 0);
+
+ createKeys(keyManager, "vol1", "buck2", keyCount, 1);
+
+ store.getDb().flush(OmMetadataManagerImpl.KEY_TABLE);
+
+ List<LiveFileMetaData> allFiles = store.getDb().getSstFileList();
+
+ writeClient.createSnapshot("vol1", "buck2", "snapshot1");
+
+ GenericTestUtils.waitFor(
+ () -> sstFilteringService.getSnapshotFilteredCount().get() >= 1, 1000,
+ 10000);
+
+ Assert
+ .assertEquals(1, sstFilteringService.getSnapshotFilteredCount().get());
+
+ SnapshotInfo snapshotInfo = om.getMetadataManager().getSnapshotInfoTable()
+ .get(SnapshotInfo.getTableKey("vol1", "buck2", "snapshot1"));
+
+ String dbSnapshots = rocksDbDir + OM_KEY_PREFIX + OM_SNAPSHOT_DIR;
+ String snapshotDirName =
+ dbSnapshots + OM_KEY_PREFIX + OM_DB_NAME + snapshotInfo
+ .getCheckpointDirName();
+
+ for (LiveFileMetaData file : allFiles) {
+ File sstFile =
+ new File(snapshotDirName + OM_KEY_PREFIX + file.fileName());
+ if (nonlevelOFiles.stream()
+ .anyMatch(o -> file.fileName().equals(o.fileName()))) {
+ Assert.assertFalse(sstFile.exists());
+ } else {
+ Assert.assertTrue(sstFile.exists());
+ }
+ }
+
+ List<String> processedSnapshotIds = Files
+ .readAllLines(Paths.get(dbSnapshots, OzoneConsts.FILTERED_SNAPSHOTS));
+ Assert.assertTrue(
+ processedSnapshotIds.contains(snapshotInfo.getSnapshotID()));
+
+ }
+
+ @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT")
+ private void createKeys(KeyManager keyManager, String volumeName,
+ String bucketName, int keyCount, int numBlocks) throws IOException {
+ for (int x = 0; x < keyCount; x++) {
+ String keyName =
+ String.format("key%s", RandomStringUtils.randomAlphanumeric(5));
+ // Create Volume and Bucket
+ TestKeyDeletingService
+ .createVolumeAndBucket(keyManager, volumeName, bucketName, false);
+
+ // Create the key
+ TestKeyDeletingService
+ .createAndCommitKey(writeClient, keyManager, volumeName, bucketName,
+ keyName, numBlocks);
+ }
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 14d1a473fe..c5b4a75574 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1836,6 +1836,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xs
<allowedImport>org.rocksdb.StatsLevel</allowedImport>
<allowedImport>org.rocksdb.TransactionLogIterator.BatchResult</allowedImport>
<allowedImport>org.rocksdb.TickerType</allowedImport>
+
<allowedImport>org.rocksdb.LiveFileMetaData</allowedImport>
<!-- Allow RocksObjects whose native pointer is managed
by RocksDB. -->
<allowedImport>org.rocksdb.ColumnFamilyHandle</allowedImport>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]