This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new e0de9345 feat(config): add `rocksdb.compression_start_level` to allow 
configure levels without compression (#2605)
e0de9345 is described below

commit e0de9345c01f3d9ddbe48d5be9b50887330acfba
Author: Egor <[email protected]>
AuthorDate: Mon Oct 21 20:01:31 2024 +0700

    feat(config): add `rocksdb.compression_start_level` to allow configure 
levels without compression (#2605)
    
    Co-authored-by: hulk <[email protected]>
---
 kvrocks.conf                 |  9 ++++++
 src/config/config.cc         | 67 +++++++++++++++++++++++++++++---------------
 src/config/config.h          |  2 ++
 src/storage/storage.cc       |  5 ++--
 tests/cppunit/config_test.cc |  1 +
 5 files changed, 59 insertions(+), 25 deletions(-)

diff --git a/kvrocks.conf b/kvrocks.conf
index 3c460c88..e7de065c 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -814,6 +814,15 @@ rocksdb.compression_level 32767
 # Default: 2 MB
 rocksdb.compaction_readahead_size 2097152
 
+# Enable compression from n levels of LSM-tree.
+# By default compression is disabled for the first two levels (L0 and L1),
+# because it may contain the frequently accessed data, so it'd be better
+# to use uncompressed data to save the CPU.
+# Value: [0, 7) (upper boundary is kvrocks maximum levels number)
+#
+# Default: 2
+rocksdb.compression_start_level 2
+
 # he limited write rate to DB if soft_pending_compaction_bytes_limit or
 # level0_slowdown_writes_trigger is triggered.
 
diff --git a/src/config/config.cc b/src/config/config.cc
index 93866bca..ce523982 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -105,6 +105,37 @@ std::string TrimRocksDbPrefix(std::string s) {
   return s.substr(8, s.size() - 8);
 }
 
+Status SetRocksdbCompression(Server *srv, const rocksdb::CompressionType 
compression,
+                             const int compression_start_level) {
+  if (!srv) return Status::OK();
+  std::string compression_option;
+  for (auto &option : engine::CompressionOptions) {
+    if (option.type == compression) {
+      compression_option = option.val;
+      break;
+    }
+  }
+  if (compression_option.empty()) {
+    return {Status::NotOK, "Invalid compression type"};
+  }
+
+  if (compression_start_level >= KVROCKS_MAX_LSM_LEVEL) {
+    return {Status::NotOK, "compression_start_level must <= rocksdb levels 
count"};
+  }
+  std::vector<std::string> compression_per_level_builder;
+  compression_per_level_builder.reserve(KVROCKS_MAX_LSM_LEVEL);
+
+  for (int i = 0; i < compression_start_level; i++) {
+    compression_per_level_builder.emplace_back("kNoCompression");
+  }
+  for (int i = compression_start_level; i < KVROCKS_MAX_LSM_LEVEL; i++) {
+    compression_per_level_builder.emplace_back(compression_option);
+  }
+  const std::string compression_per_level = util::StringJoin(
+      compression_per_level_builder, [](const auto &s) -> decltype(auto) { 
return s; }, ":");
+  return srv->storage->SetOptionForAllColumnFamilies("compression_per_level", 
compression_per_level);
+};
+
 Config::Config() {
   struct FieldWrapper {
     std::string name;
@@ -211,6 +242,8 @@ Config::Config() {
        new EnumField<rocksdb::CompressionType>(&rocks_db.compression, 
compression_types,
                                                
rocksdb::CompressionType::kNoCompression)},
       {"rocksdb.compression_level", true, new 
IntField(&rocks_db.compression_level, 32767, INT_MIN, INT_MAX)},
+      {"rocksdb.compression_start_level", false,
+       new IntField(&rocks_db.compression_start_level, 2, 0, 
KVROCKS_MAX_LSM_LEVEL - 1)},
       {"rocksdb.block_size", true, new IntField(&rocks_db.block_size, 16384, 
0, INT_MAX)},
       {"rocksdb.max_open_files", false, new IntField(&rocks_db.max_open_files, 
8096, -1, INT_MAX)},
       {"rocksdb.write_buffer_size", false, new 
IntField(&rocks_db.write_buffer_size, 64, 0, 4096)},
@@ -380,31 +413,20 @@ void Config::initFieldCallback() {
     if (!srv) return Status::OK();  // srv is nullptr when load config from 
file
     return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), 
v);
   };
+
   auto set_compression_type_cb = [](Server *srv, [[maybe_unused]] const 
std::string &k,
-                                    const std::string &v) -> Status {
+                                    [[maybe_unused]] const std::string &v) -> 
Status {
     if (!srv) return Status::OK();
-
-    std::string compression_option;
-    for (auto &option : engine::CompressionOptions) {
-      if (option.name == v) {
-        compression_option = option.val;
-        break;
-      }
-    }
-    if (compression_option.empty()) {
-      return {Status::NotOK, "Invalid compression type"};
-    }
-
-    // For the first two levels, it may contain the frequently accessed data,
-    // so it'd be better to use uncompressed data to save the CPU.
-    std::string compression_levels = "kNoCompression:kNoCompression";
-    auto db = srv->storage->GetDB();
-    for (size_t i = 2; i < db->GetOptions().compression_per_level.size(); i++) 
{
-      compression_levels += ":";
-      compression_levels += compression_option;
-    }
-    return 
srv->storage->SetOptionForAllColumnFamilies("compression_per_level", 
compression_levels);
+    return SetRocksdbCompression(srv, srv->GetConfig()->rocks_db.compression,
+                                 
srv->GetConfig()->rocks_db.compression_start_level);
   };
+  auto set_compression_start_level_cb = [](Server *srv, [[maybe_unused]] const 
std::string &k,
+                                           [[maybe_unused]] const std::string 
&v) -> Status {
+    if (!srv) return Status::OK();
+    return SetRocksdbCompression(srv, srv->GetConfig()->rocks_db.compression,
+                                 
srv->GetConfig()->rocks_db.compression_start_level);
+  };
+
 #ifdef ENABLE_OPENSSL
   auto set_tls_option = [](Server *srv, [[maybe_unused]] const std::string &k, 
[[maybe_unused]] const std::string &v) {
     if (!srv) return Status::OK();  // srv is nullptr when load config from 
file
@@ -709,6 +731,7 @@ void Config::initFieldCallback() {
           {"rocksdb.level0_stop_writes_trigger", set_cf_option_cb},
           {"rocksdb.level0_file_num_compaction_trigger", set_cf_option_cb},
           {"rocksdb.compression", set_compression_type_cb},
+          {"rocksdb.compression_start_level", set_compression_start_level_cb},
 #ifdef ENABLE_OPENSSL
           {"tls-cert-file", set_tls_option},
           {"tls-key-file", set_tls_option},
diff --git a/src/config/config.h b/src/config/config.h
index 5969ef55..bc33ac97 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -54,6 +54,7 @@ constexpr const size_t GiB = 1024L * MiB;
 constexpr const uint32_t kDefaultPort = 6666;
 
 constexpr const char *kDefaultNamespace = "__namespace";
+constexpr const size_t KVROCKS_MAX_LSM_LEVEL = 7;
 
 enum class BlockCacheType { kCacheTypeLRU = 0, kCacheTypeHCC };
 
@@ -200,6 +201,7 @@ struct Config {
     int level0_stop_writes_trigger;
     int level0_file_num_compaction_trigger;
     rocksdb::CompressionType compression;
+    int compression_start_level;
     int compression_level;
     bool disable_auto_compactions;
     bool enable_blob_files;
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index 9e1b315d..2eead08a 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -165,13 +165,12 @@ rocksdb::Options Storage::InitRocksDBOptions() {
   options.max_write_buffer_number = config_->rocks_db.max_write_buffer_number;
   options.min_write_buffer_number_to_merge = 2;
   options.write_buffer_size = config_->rocks_db.write_buffer_size * MiB;
-  options.num_levels = 7;
+  options.num_levels = KVROCKS_MAX_LSM_LEVEL;
   options.compression_opts.level = config_->rocks_db.compression_level;
   options.compression_per_level.resize(options.num_levels);
   options.wal_compression = config_->rocks_db.wal_compression;
-  // only compress levels >= 2
   for (int i = 0; i < options.num_levels; ++i) {
-    if (i < 2) {
+    if (i < config_->rocks_db.compression_start_level) {
       options.compression_per_level[i] = 
rocksdb::CompressionType::kNoCompression;
     } else {
       options.compression_per_level[i] = config_->rocks_db.compression;
diff --git a/tests/cppunit/config_test.cc b/tests/cppunit/config_test.cc
index dfe63118..4e9b3817 100644
--- a/tests/cppunit/config_test.cc
+++ b/tests/cppunit/config_test.cc
@@ -82,6 +82,7 @@ TEST(Config, GetAndSet) {
       {"rocksdb.max_bytes_for_level_multiplier", "10"},
       {"rocksdb.level_compaction_dynamic_level_bytes", "yes"},
       {"rocksdb.max_background_jobs", "4"},
+      {"rocksdb.compression_start_level", "2"},
   };
   std::vector<std::string> values;
   for (const auto &iter : mutable_cases) {

Reply via email to