This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 90e05f19 Refactor Column Family Handling (#2296)
90e05f19 is described below

commit 90e05f19a20cf8a852dd51059a7f5fbc23d6fb76
Author: mwish <[email protected]>
AuthorDate: Tue May 14 19:34:57 2024 +0800

    Refactor Column Family Handling (#2296)
---
 src/cluster/replication.cc                         |   7 +-
 src/cluster/slot_migrate.cc                        |  18 +++-
 src/search/executors/full_index_scan_executor.h    |   3 +-
 src/search/executors/numeric_field_scan_executor.h |   3 +-
 src/search/executors/tag_field_scan_executor.h     |   3 +-
 src/search/indexer.cc                              |   4 +-
 src/server/namespace.h                             |   5 +-
 src/server/server.cc                               |  17 ++--
 src/stats/disk_stats.cc                            |  16 +--
 src/storage/batch_extractor.cc                     |  16 +--
 src/storage/compaction_checker.cc                  |  15 +--
 src/storage/compaction_checker.h                   |   2 +-
 src/storage/iterator.cc                            |  10 +-
 src/storage/redis_db.cc                            |   9 +-
 src/storage/redis_pubsub.h                         |   3 +-
 src/storage/scripting.cc                           |   6 +-
 src/storage/storage.cc                             |  52 ++++------
 src/storage/storage.h                              | 110 +++++++++++++++++----
 src/types/redis_stream.h                           |   2 +-
 src/types/redis_string.cc                          |   4 +-
 src/types/redis_zset.h                             |   2 +-
 tests/cppunit/compact_test.cc                      |  18 ++--
 tests/cppunit/indexer_test.cc                      |   4 +-
 tests/cppunit/iterator_test.cc                     |  28 +++---
 utils/kvrocks2redis/parser.cc                      |   2 +-
 25 files changed, 212 insertions(+), 147 deletions(-)

diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index 57d8b9bc..ea9c5587 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -1011,17 +1011,18 @@ bool ReplicationThread::isUnknownOption(const char 
*err) { return std::string(er
 rocksdb::Status WriteBatchHandler::PutCF(uint32_t column_family_id, const 
rocksdb::Slice &key,
                                          const rocksdb::Slice &value) {
   type_ = kBatchTypeNone;
-  if (column_family_id == kColumnFamilyIDPubSub) {
+  if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::PubSub)) {
     type_ = kBatchTypePublish;
     kv_ = std::make_pair(key.ToString(), value.ToString());
     return rocksdb::Status::OK();
-  } else if (column_family_id == kColumnFamilyIDPropagate) {
+  } else if (column_family_id == 
static_cast<uint32_t>(ColumnFamilyID::Propagate)) {
     type_ = kBatchTypePropagate;
     kv_ = std::make_pair(key.ToString(), value.ToString());
     return rocksdb::Status::OK();
-  } else if (column_family_id == kColumnFamilyIDStream) {
+  } else if (column_family_id == 
static_cast<uint32_t>(ColumnFamilyID::Stream)) {
     type_ = kBatchTypeStream;
     kv_ = std::make_pair(key.ToString(), value.ToString());
+    return rocksdb::Status::OK();
   }
   return rocksdb::Status::OK();
 }
diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc
index a9c466a4..a074d536 100644
--- a/src/cluster/slot_migrate.cc
+++ b/src/cluster/slot_migrate.cc
@@ -342,7 +342,7 @@ Status SlotMigrator::sendSnapshotByCmd() {
   read_options.snapshot = slot_snapshot_;
   Slice prefix_slice(prefix);
   read_options.iterate_lower_bound = &prefix_slice;
-  rocksdb::ColumnFamilyHandle *cf_handle = 
storage_->GetCFHandle(engine::kMetadataColumnFamilyName);
+  rocksdb::ColumnFamilyHandle *cf_handle = 
storage_->GetCFHandle(ColumnFamilyID::Metadata);
   auto iter = 
util::UniqueIterator(storage_->GetDB()->NewIterator(read_options, cf_handle));
 
   // Seek to the beginning of keys start with 'prefix' and iterate all these 
keys
@@ -851,8 +851,8 @@ Status SlotMigrator::migrateStream(const Slice &key, const 
StreamMetadata &metad
   read_options.iterate_lower_bound = &prefix_key_slice;
 
   // Should use th raw db iterator to avoid reading uncommitted writes in 
transaction mode
-  auto iter = util::UniqueIterator(
-      storage_->GetDB()->NewIterator(read_options, 
storage_->GetCFHandle(engine::kStreamColumnFamilyName)));
+  auto iter =
+      util::UniqueIterator(storage_->GetDB()->NewIterator(read_options, 
storage_->GetCFHandle(ColumnFamilyID::Stream)));
 
   std::vector<std::string> user_cmd = {type_to_cmd[metadata.Type()], 
key.ToString()};
 
@@ -1224,7 +1224,7 @@ Status SlotMigrator::sendSnapshotByRawKV() {
     }
     batch_sender.SetPrefixLogData(log_data);
 
-    
GET_OR_RET(batch_sender.Put(storage_->GetCFHandle(engine::kMetadataColumnFamilyName),
 iter.Key(), iter.Value()));
+    
GET_OR_RET(batch_sender.Put(storage_->GetCFHandle(ColumnFamilyID::Metadata), 
iter.Key(), iter.Value()));
 
     auto subkey_iter = iter.GetSubKeyIterator();
     if (!subkey_iter) {
@@ -1240,7 +1240,7 @@ Status SlotMigrator::sendSnapshotByRawKV() {
         score_key.append(subkey_iter->UserKey().ToString());
         auto score_key_bytes =
             InternalKey(iter.Key(), score_key, internal_key.GetVersion(), 
storage_->IsSlotIdEncoded()).Encode();
-        
GET_OR_RET(batch_sender.Put(storage_->GetCFHandle(kColumnFamilyIDZSetScore), 
score_key_bytes, Slice()));
+        
GET_OR_RET(batch_sender.Put(storage_->GetCFHandle(ColumnFamilyID::SecondarySubkey),
 score_key_bytes, Slice()));
       }
 
       if (batch_sender.IsFull()) {
@@ -1334,11 +1334,19 @@ Status 
SlotMigrator::migrateIncrementalDataByRawKV(uint64_t end_seq, BatchSender
         break;
       }
       case engine::WALItem::Type::kTypePut: {
+        if (item.column_family_id > kMaxColumnFamilyID) {
+          LOG(INFO) << fmt::format("[migrate] Invalid put column family id: 
{}", item.column_family_id);
+          continue;
+        }
         
GET_OR_RET(batch_sender->Put(storage_->GetCFHandle(static_cast<ColumnFamilyID>(item.column_family_id)),
                                      item.key, item.value));
         break;
       }
       case engine::WALItem::Type::kTypeDelete: {
+        if (item.column_family_id > kMaxColumnFamilyID) {
+          LOG(INFO) << fmt::format("[migrate] Invalid delete column family id: 
{}", item.column_family_id);
+          continue;
+        }
         GET_OR_RET(
             
batch_sender->Delete(storage_->GetCFHandle(static_cast<ColumnFamilyID>(item.column_family_id)),
 item.key));
         break;
diff --git a/src/search/executors/full_index_scan_executor.h 
b/src/search/executors/full_index_scan_executor.h
index 0afeae04..3fde9ef5 100644
--- a/src/search/executors/full_index_scan_executor.h
+++ b/src/search/executors/full_index_scan_executor.h
@@ -50,8 +50,7 @@ struct FullIndexScanExecutor : ExecutorNode {
     if (!iter) {
       rocksdb::ReadOptions read_options = ctx->storage->DefaultScanOptions();
       read_options.snapshot = ss.GetSnapShot();
-      iter = util::UniqueIterator(ctx->storage, read_options,
-                                  
ctx->storage->GetCFHandle(engine::kMetadataColumnFamilyName));
+      iter = util::UniqueIterator(ctx->storage, read_options, 
ctx->storage->GetCFHandle(ColumnFamilyID::Metadata));
       iter->Seek(ns_key);
     }
 
diff --git a/src/search/executors/numeric_field_scan_executor.h 
b/src/search/executors/numeric_field_scan_executor.h
index 1609f433..970ef106 100644
--- a/src/search/executors/numeric_field_scan_executor.h
+++ b/src/search/executors/numeric_field_scan_executor.h
@@ -76,8 +76,7 @@ struct NumericFieldScanExecutor : ExecutorNode {
       rocksdb::ReadOptions read_options = ctx->storage->DefaultScanOptions();
       read_options.snapshot = ss.GetSnapShot();
 
-      iter =
-          util::UniqueIterator(ctx->storage, read_options, 
ctx->storage->GetCFHandle(engine::kSearchColumnFamilyName));
+      iter = util::UniqueIterator(ctx->storage, read_options, 
ctx->storage->GetCFHandle(ColumnFamilyID::Search));
       if (scan->order == SortByClause::ASC) {
         iter->Seek(IndexKey(scan->range.l));
       } else {
diff --git a/src/search/executors/tag_field_scan_executor.h 
b/src/search/executors/tag_field_scan_executor.h
index a3781c11..ddedffc9 100644
--- a/src/search/executors/tag_field_scan_executor.h
+++ b/src/search/executors/tag_field_scan_executor.h
@@ -76,8 +76,7 @@ struct TagFieldScanExecutor : ExecutorNode {
       rocksdb::ReadOptions read_options = ctx->storage->DefaultScanOptions();
       read_options.snapshot = ss.GetSnapShot();
 
-      iter =
-          util::UniqueIterator(ctx->storage, read_options, 
ctx->storage->GetCFHandle(engine::kSearchColumnFamilyName));
+      iter = util::UniqueIterator(ctx->storage, read_options, 
ctx->storage->GetCFHandle(ColumnFamilyID::Search));
       iter->Seek(index_key);
     }
 
diff --git a/src/search/indexer.cc b/src/search/indexer.cc
index 4a4a949c..4bce72de 100644
--- a/src/search/indexer.cc
+++ b/src/search/indexer.cc
@@ -159,7 +159,7 @@ Status IndexUpdater::UpdateIndex(const std::string &field, 
std::string_view key,
     }
 
     auto batch = storage->GetWriteBatchBase();
-    auto cf_handle = storage->GetCFHandle(engine::kSearchColumnFamilyName);
+    auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search);
 
     for (const auto &tag : tags_to_delete) {
       auto sub_key = ConstructTagFieldSubkey(field, tag, key);
@@ -179,7 +179,7 @@ Status IndexUpdater::UpdateIndex(const std::string &field, 
std::string_view key,
     if (!s.ok()) return {Status::NotOK, s.ToString()};
   } else if (auto numeric [[maybe_unused]] = 
dynamic_cast<SearchNumericFieldMetadata *>(metadata)) {
     auto batch = storage->GetWriteBatchBase();
-    auto cf_handle = storage->GetCFHandle(engine::kSearchColumnFamilyName);
+    auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search);
 
     if (!original.empty()) {
       auto original_num = GET_OR_RET(ParseFloat(std::string(original.begin(), 
original.end())));
diff --git a/src/server/namespace.h b/src/server/namespace.h
index 943a3ece..c0556eeb 100644
--- a/src/server/namespace.h
+++ b/src/server/namespace.h
@@ -26,9 +26,8 @@ constexpr const char *kNamespaceDBKey = "__namespace_keys__";
 
 class Namespace {
  public:
-  explicit Namespace(engine::Storage *storage) : storage_(storage) {
-    cf_ = storage_->GetCFHandle(engine::kPropagateColumnFamilyName);
-  }
+  explicit Namespace(engine::Storage *storage)
+      : storage_(storage), 
cf_(storage_->GetCFHandle(ColumnFamilyID::Propagate)) {}
 
   ~Namespace() = default;
   Namespace(const Namespace &) = delete;
diff --git a/src/server/server.cc b/src/server/server.cc
index 7c50a008..0ebc142c 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -195,10 +195,9 @@ Status Server::Start() {
         auto now_hours = util::GetTimeStamp<std::chrono::hours>();
         if (now_hours >= config_->compaction_checker_range.start &&
             now_hours <= config_->compaction_checker_range.stop) {
-          std::vector<std::string> cf_names = 
{engine::kMetadataColumnFamilyName, engine::kSubkeyColumnFamilyName,
-                                               
engine::kZSetScoreColumnFamilyName, engine::kStreamColumnFamilyName};
-          for (const auto &cf_name : cf_names) {
-            compaction_checker.PickCompactionFiles(cf_name);
+          const auto &column_family_list = 
engine::ColumnFamilyConfigs::ListAllColumnFamilies();
+          for (auto &column_family : column_family_list) {
+            compaction_checker.PickCompactionFilesForCf(column_family);
           }
         }
         // compact once per day
@@ -850,7 +849,7 @@ void Server::GetRocksDBInfo(std::string *info) {
     // All column families share the same block cache, so it's good to count a 
single one.
     uint64_t block_cache_usage = 0;
     uint64_t block_cache_pinned_usage = 0;
-    auto subkey_cf_handle = 
storage->GetCFHandle(engine::kSubkeyColumnFamilyName);
+    auto subkey_cf_handle = 
storage->GetCFHandle(ColumnFamilyID::PrimarySubkey);
     db->GetIntProperty(subkey_cf_handle, 
rocksdb::DB::Properties::kBlockCacheUsage, &block_cache_usage);
     string_stream << "block_cache_usage:" << block_cache_usage << "\r\n";
     db->GetIntProperty(subkey_cf_handle, 
rocksdb::DB::Properties::kBlockCachePinnedUsage, &block_cache_pinned_usage);
@@ -1651,7 +1650,7 @@ Status Server::ScriptExists(const std::string &sha) {
 
 Status Server::ScriptGet(const std::string &sha, std::string *body) const {
   std::string func_name = engine::kLuaFuncSHAPrefix + sha;
-  auto cf = storage->GetCFHandle(engine::kPropagateColumnFamilyName);
+  auto cf = storage->GetCFHandle(ColumnFamilyID::Propagate);
   auto s = storage->Get(rocksdb::ReadOptions(), cf, func_name, body);
   if (!s.ok()) {
     return {s.IsNotFound() ? Status::NotFound : Status::NotOK, s.ToString()};
@@ -1666,7 +1665,7 @@ Status Server::ScriptSet(const std::string &sha, const 
std::string &body) const
 
 Status Server::FunctionGetCode(const std::string &lib, std::string *code) 
const {
   std::string func_name = engine::kLuaLibCodePrefix + lib;
-  auto cf = storage->GetCFHandle(engine::kPropagateColumnFamilyName);
+  auto cf = storage->GetCFHandle(ColumnFamilyID::Propagate);
   auto s = storage->Get(rocksdb::ReadOptions(), cf, func_name, code);
   if (!s.ok()) {
     return {s.IsNotFound() ? Status::NotFound : Status::NotOK, s.ToString()};
@@ -1676,7 +1675,7 @@ Status Server::FunctionGetCode(const std::string &lib, 
std::string *code) const
 
 Status Server::FunctionGetLib(const std::string &func, std::string *lib) const 
{
   std::string func_name = engine::kLuaFuncLibPrefix + func;
-  auto cf = storage->GetCFHandle(engine::kPropagateColumnFamilyName);
+  auto cf = storage->GetCFHandle(ColumnFamilyID::Propagate);
   auto s = storage->Get(rocksdb::ReadOptions(), cf, func_name, lib);
   if (!s.ok()) {
     return {s.IsNotFound() ? Status::NotFound : Status::NotOK, s.ToString()};
@@ -1700,7 +1699,7 @@ void Server::ScriptReset() {
 }
 
 Status Server::ScriptFlush() {
-  auto cf = storage->GetCFHandle(engine::kPropagateColumnFamilyName);
+  auto cf = storage->GetCFHandle(ColumnFamilyID::Propagate);
   auto s = storage->FlushScripts(storage->DefaultWriteOptions(), cf);
   if (!s.ok()) return {Status::NotOK, s.ToString()};
   ScriptReset();
diff --git a/src/stats/disk_stats.cc b/src/stats/disk_stats.cc
index 9eda0950..8c7f98dd 100644
--- a/src/stats/disk_stats.cc
+++ b/src/stats/disk_stats.cc
@@ -79,14 +79,14 @@ rocksdb::Status Disk::GetHashSize(const Slice &ns_key, 
uint64_t *key_size) {
   HashMetadata metadata(false);
   rocksdb::Status s = Database::GetMetadata(Database::GetOptions{}, 
{kRedisHash}, ns_key, &metadata);
   if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
-  return GetApproximateSizes(metadata, ns_key, 
storage_->GetCFHandle(engine::kSubkeyColumnFamilyName), key_size);
+  return GetApproximateSizes(metadata, ns_key, 
storage_->GetCFHandle(ColumnFamilyID::PrimarySubkey), key_size);
 }
 
 rocksdb::Status Disk::GetSetSize(const Slice &ns_key, uint64_t *key_size) {
   SetMetadata metadata(false);
   rocksdb::Status s = Database::GetMetadata(Database::GetOptions{}, 
{kRedisSet}, ns_key, &metadata);
   if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
-  return GetApproximateSizes(metadata, ns_key, 
storage_->GetCFHandle(engine::kSubkeyColumnFamilyName), key_size);
+  return GetApproximateSizes(metadata, ns_key, 
storage_->GetCFHandle(ColumnFamilyID::PrimarySubkey), key_size);
 }
 
 rocksdb::Status Disk::GetListSize(const Slice &ns_key, uint64_t *key_size) {
@@ -95,7 +95,7 @@ rocksdb::Status Disk::GetListSize(const Slice &ns_key, 
uint64_t *key_size) {
   if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
   std::string buf;
   PutFixed64(&buf, metadata.head);
-  return GetApproximateSizes(metadata, ns_key, 
storage_->GetCFHandle(engine::kSubkeyColumnFamilyName), key_size, buf);
+  return GetApproximateSizes(metadata, ns_key, 
storage_->GetCFHandle(ColumnFamilyID::PrimarySubkey), key_size, buf);
 }
 
 rocksdb::Status Disk::GetZsetSize(const Slice &ns_key, uint64_t *key_size) {
@@ -104,17 +104,17 @@ rocksdb::Status Disk::GetZsetSize(const Slice &ns_key, 
uint64_t *key_size) {
   if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
   std::string score_bytes;
   PutDouble(&score_bytes, kMinScore);
-  s = GetApproximateSizes(metadata, ns_key, 
storage_->GetCFHandle(engine::kZSetScoreColumnFamilyName), key_size,
+  s = GetApproximateSizes(metadata, ns_key, 
storage_->GetCFHandle(ColumnFamilyID::SecondarySubkey), key_size,
                           score_bytes, score_bytes);
   if (!s.ok()) return s;
-  return GetApproximateSizes(metadata, ns_key, 
storage_->GetCFHandle(engine::kSubkeyColumnFamilyName), key_size);
+  return GetApproximateSizes(metadata, ns_key, 
storage_->GetCFHandle(ColumnFamilyID::PrimarySubkey), key_size);
 }
 
 rocksdb::Status Disk::GetBitmapSize(const Slice &ns_key, uint64_t *key_size) {
   BitmapMetadata metadata(false);
   rocksdb::Status s = Database::GetMetadata(Database::GetOptions{}, 
{kRedisBitmap}, ns_key, &metadata);
   if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
-  return GetApproximateSizes(metadata, ns_key, 
storage_->GetCFHandle(engine::kSubkeyColumnFamilyName), key_size,
+  return GetApproximateSizes(metadata, ns_key, 
storage_->GetCFHandle(ColumnFamilyID::PrimarySubkey), key_size,
                              std::to_string(0), std::to_string(0));
 }
 
@@ -124,7 +124,7 @@ rocksdb::Status Disk::GetSortedintSize(const Slice &ns_key, 
uint64_t *key_size)
   if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
   std::string start_buf;
   PutFixed64(&start_buf, 0);
-  return GetApproximateSizes(metadata, ns_key, 
storage_->GetCFHandle(engine::kSubkeyColumnFamilyName), key_size,
+  return GetApproximateSizes(metadata, ns_key, 
storage_->GetCFHandle(ColumnFamilyID::PrimarySubkey), key_size,
                              start_buf, start_buf);
 }
 
@@ -132,7 +132,7 @@ rocksdb::Status Disk::GetStreamSize(const Slice &ns_key, 
uint64_t *key_size) {
   StreamMetadata metadata(false);
   rocksdb::Status s = Database::GetMetadata(Database::GetOptions{}, 
{kRedisStream}, ns_key, &metadata);
   if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
-  return GetApproximateSizes(metadata, ns_key, 
storage_->GetCFHandle(engine::kStreamColumnFamilyName), key_size);
+  return GetApproximateSizes(metadata, ns_key, 
storage_->GetCFHandle(ColumnFamilyID::Stream), key_size);
 }
 
 }  // namespace redis
diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc
index f60669eb..c1559b1a 100644
--- a/src/storage/batch_extractor.cc
+++ b/src/storage/batch_extractor.cc
@@ -44,14 +44,14 @@ void WriteBatchExtractor::LogData(const rocksdb::Slice 
&blob) {
 }
 
 rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const 
Slice &key, const Slice &value) {
-  if (column_family_id == kColumnFamilyIDZSetScore) {
+  if (column_family_id == 
static_cast<uint32_t>(ColumnFamilyID::SecondarySubkey)) {
     return rocksdb::Status::OK();
   }
 
   std::string ns, user_key;
   std::vector<std::string> command_args;
 
-  if (column_family_id == kColumnFamilyIDMetadata) {
+  if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::Metadata)) {
     std::tie(ns, user_key) = ExtractNamespaceKey<std::string>(key, 
is_slot_id_encoded_);
     if (slot_id_ >= 0 && static_cast<uint16_t>(slot_id_) != 
GetSlotIdFromKey(user_key)) {
       return rocksdb::Status::OK();
@@ -109,7 +109,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t 
column_family_id, const Slic
     return rocksdb::Status::OK();
   }
 
-  if (column_family_id == kColumnFamilyIDDefault) {
+  if (column_family_id == 
static_cast<uint32_t>(ColumnFamilyID::PrimarySubkey)) {
     InternalKey ikey(key, is_slot_id_encoded_);
     user_key = ikey.GetKey().ToString();
     if (slot_id_ >= 0 && static_cast<uint16_t>(slot_id_) != 
GetSlotIdFromKey(user_key)) {
@@ -252,7 +252,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t 
column_family_id, const Slic
       default:
         break;
     }
-  } else if (column_family_id == kColumnFamilyIDStream) {
+  } else if (column_family_id == 
static_cast<uint32_t>(ColumnFamilyID::Stream)) {
     auto s = ExtractStreamAddCommand(is_slot_id_encoded_, key, value, 
&command_args);
     if (!s.IsOK()) {
       LOG(ERROR) << "Failed to parse write_batch in PutCF. Type=Stream: " << 
s.Msg();
@@ -268,14 +268,14 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t 
column_family_id, const Slic
 }
 
 rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const 
Slice &key) {
-  if (column_family_id == kColumnFamilyIDZSetScore) {
+  if (column_family_id == 
static_cast<uint32_t>(ColumnFamilyID::SecondarySubkey)) {
     return rocksdb::Status::OK();
   }
 
   std::vector<std::string> command_args;
   std::string ns;
 
-  if (column_family_id == kColumnFamilyIDMetadata) {
+  if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::Metadata)) {
     std::string user_key;
     std::tie(ns, user_key) = ExtractNamespaceKey<std::string>(key, 
is_slot_id_encoded_);
 
@@ -284,7 +284,7 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t 
column_family_id, const S
     }
 
     command_args = {"DEL", user_key};
-  } else if (column_family_id == kColumnFamilyIDDefault) {
+  } else if (column_family_id == 
static_cast<uint32_t>(ColumnFamilyID::PrimarySubkey)) {
     InternalKey ikey(key, is_slot_id_encoded_);
     std::string user_key = ikey.GetKey().ToString();
     if (slot_id_ >= 0 && static_cast<uint16_t>(slot_id_) != 
GetSlotIdFromKey(user_key)) {
@@ -376,7 +376,7 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t 
column_family_id, const S
       default:
         break;
     }
-  } else if (column_family_id == kColumnFamilyIDStream) {
+  } else if (column_family_id == 
static_cast<uint32_t>(ColumnFamilyID::Stream)) {
     InternalKey ikey(key, is_slot_id_encoded_);
     Slice encoded_id = ikey.GetSubKey();
     redis::StreamEntryID entry_id;
diff --git a/src/storage/compaction_checker.cc 
b/src/storage/compaction_checker.cc
index 55502b2a..649f1f23 100644
--- a/src/storage/compaction_checker.cc
+++ b/src/storage/compaction_checker.cc
@@ -29,18 +29,19 @@
 void CompactionChecker::CompactPropagateAndPubSubFiles() {
   rocksdb::CompactRangeOptions compact_opts;
   compact_opts.change_level = true;
-  std::vector<std::string> cf_names = {engine::kPubSubColumnFamilyName, 
engine::kPropagateColumnFamilyName};
-  for (const auto &cf_name : cf_names) {
-    LOG(INFO) << "[compaction checker] Start the compact the column family: " 
<< cf_name;
-    auto cf_handle = storage_->GetCFHandle(cf_name);
+  for (const auto &cf :
+       {engine::ColumnFamilyConfigs::PubSubColumnFamily(), 
engine::ColumnFamilyConfigs::PropagateColumnFamily()}) {
+    LOG(INFO) << "[compaction checker] Start the compact the column family: " 
<< cf.Name();
+    auto cf_handle = storage_->GetCFHandle(cf.Id());
     auto s = storage_->GetDB()->CompactRange(compact_opts, cf_handle, nullptr, 
nullptr);
-    LOG(INFO) << "[compaction checker] Compact the column family: " << cf_name 
<< " finished, result: " << s.ToString();
+    LOG(INFO) << "[compaction checker] Compact the column family: " << 
cf.Name()
+              << " finished, result: " << s.ToString();
   }
 }
 
-void CompactionChecker::PickCompactionFiles(const std::string &cf_name) {
+void CompactionChecker::PickCompactionFilesForCf(const 
engine::ColumnFamilyConfig &column_family_config) {
   rocksdb::TablePropertiesCollection props;
-  rocksdb::ColumnFamilyHandle *cf = storage_->GetCFHandle(cf_name);
+  rocksdb::ColumnFamilyHandle *cf = 
storage_->GetCFHandle(column_family_config.Id());
   auto s = storage_->GetDB()->GetPropertiesOfAllTables(cf, &props);
   if (!s.ok()) {
     LOG(WARNING) << "[compaction checker] Failed to get table properties, " << 
s.ToString();
diff --git a/src/storage/compaction_checker.h b/src/storage/compaction_checker.h
index 750c3229..f7c260b9 100644
--- a/src/storage/compaction_checker.h
+++ b/src/storage/compaction_checker.h
@@ -30,7 +30,7 @@ class CompactionChecker {
  public:
   explicit CompactionChecker(engine::Storage *storage) : storage_(storage) {}
   ~CompactionChecker() = default;
-  void PickCompactionFiles(const std::string &cf_name);
+  void PickCompactionFilesForCf(const engine::ColumnFamilyConfig &cf_name);
   void CompactPropagateAndPubSubFiles();
 
  private:
diff --git a/src/storage/iterator.cc b/src/storage/iterator.cc
index 6514207b..c65dd3ab 100644
--- a/src/storage/iterator.cc
+++ b/src/storage/iterator.cc
@@ -26,8 +26,10 @@
 
 namespace engine {
 DBIterator::DBIterator(Storage *storage, rocksdb::ReadOptions read_options, 
int slot)
-    : storage_(storage), read_options_(std::move(read_options)), slot_(slot) {
-  metadata_cf_handle_ = storage_->GetCFHandle(kMetadataColumnFamilyName);
+    : storage_(storage),
+      read_options_(std::move(read_options)),
+      slot_(slot),
+      metadata_cf_handle_(storage_->GetCFHandle(ColumnFamilyID::Metadata)) {
   metadata_iter_ = util::UniqueIterator(storage_->NewIterator(read_options_, 
metadata_cf_handle_));
 }
 
@@ -115,9 +117,9 @@ std::unique_ptr<SubKeyIterator> 
DBIterator::GetSubKeyIterator() const {
 SubKeyIterator::SubKeyIterator(Storage *storage, rocksdb::ReadOptions 
read_options, RedisType type, std::string prefix)
     : storage_(storage), read_options_(std::move(read_options)), type_(type), 
prefix_(std::move(prefix)) {
   if (type_ == kRedisStream) {
-    cf_handle_ = storage_->GetCFHandle(kStreamColumnFamilyName);
+    cf_handle_ = storage_->GetCFHandle(ColumnFamilyID::Stream);
   } else {
-    cf_handle_ = storage_->GetCFHandle(kSubkeyColumnFamilyName);
+    cf_handle_ = storage_->GetCFHandle(ColumnFamilyID::PrimarySubkey);
   }
   iter_ = util::UniqueIterator(storage_->NewIterator(read_options_, 
cf_handle_));
 }
diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc
index 13f9bd2f..3038a000 100644
--- a/src/storage/redis_db.cc
+++ b/src/storage/redis_db.cc
@@ -43,9 +43,10 @@
 
 namespace redis {
 
-Database::Database(engine::Storage *storage, std::string ns) : 
storage_(storage), namespace_(std::move(ns)) {
-  metadata_cf_handle_ = 
storage->GetCFHandle(engine::kMetadataColumnFamilyName);
-}
+Database::Database(engine::Storage *storage, std::string ns)
+    : storage_(storage),
+      metadata_cf_handle_(storage->GetCFHandle(ColumnFamilyID::Metadata)),
+      namespace_(std::move(ns)) {}
 
 // Some data types may support reading multiple types of metadata.
 // For example, bitmap supports reading string metadata and bitmap metadata.
@@ -748,7 +749,7 @@ rocksdb::Status Database::Copy(const std::string &key, 
const std::string &new_ke
   auto subkey_iter = iter.GetSubKeyIterator();
 
   if (subkey_iter != nullptr) {
-    auto zset_score_cf = type == kRedisZSet ? 
storage_->GetCFHandle(engine::kZSetScoreColumnFamilyName) : nullptr;
+    auto zset_score_cf = type == kRedisZSet ? 
storage_->GetCFHandle(ColumnFamilyID::SecondarySubkey) : nullptr;
 
     for (subkey_iter->Seek(); subkey_iter->Valid(); subkey_iter->Next()) {
       InternalKey from_ikey(subkey_iter->Key(), storage_->IsSlotIdEncoded());
diff --git a/src/storage/redis_pubsub.h b/src/storage/redis_pubsub.h
index 995560aa..d6e3d392 100644
--- a/src/storage/redis_pubsub.h
+++ b/src/storage/redis_pubsub.h
@@ -29,7 +29,8 @@ namespace redis {
 
 class PubSub : public Database {
  public:
-  explicit PubSub(engine::Storage *storage) : Database(storage), 
pubsub_cf_handle_(storage->GetCFHandle("pubsub")) {}
+  explicit PubSub(engine::Storage *storage)
+      : Database(storage), 
pubsub_cf_handle_(storage->GetCFHandle(ColumnFamilyID::PubSub)) {}
   rocksdb::Status Publish(const Slice &channel, const Slice &value);
 
  private:
diff --git a/src/storage/scripting.cc b/src/storage/scripting.cc
index a16205a4..409d5e19 100644
--- a/src/storage/scripting.cc
+++ b/src/storage/scripting.cc
@@ -443,7 +443,7 @@ Status FunctionList(Server *srv, const redis::Connection 
*conn, const std::strin
   rocksdb::Slice upper_bound(end_key);
   read_options.iterate_upper_bound = &upper_bound;
 
-  auto *cf = srv->storage->GetCFHandle(engine::kPropagateColumnFamilyName);
+  auto *cf = srv->storage->GetCFHandle(ColumnFamilyID::Propagate);
   auto iter = util::UniqueIterator(srv->storage, read_options, cf);
   std::vector<std::pair<std::string, std::string>> result;
   for (iter->Seek(start_key); iter->Valid(); iter->Next()) {
@@ -479,7 +479,7 @@ Status FunctionListFunc(Server *srv, const 
redis::Connection *conn, const std::s
   rocksdb::Slice upper_bound(end_key);
   read_options.iterate_upper_bound = &upper_bound;
 
-  auto *cf = srv->storage->GetCFHandle(engine::kPropagateColumnFamilyName);
+  auto *cf = srv->storage->GetCFHandle(ColumnFamilyID::Propagate);
   auto iter = util::UniqueIterator(srv->storage, read_options, cf);
   std::vector<std::pair<std::string, std::string>> result;
   for (iter->Seek(start_key); iter->Valid(); iter->Next()) {
@@ -557,7 +557,7 @@ Status FunctionDelete(Server *srv, const std::string &name) 
{
   }
 
   auto storage = srv->storage;
-  auto cf = storage->GetCFHandle(engine::kPropagateColumnFamilyName);
+  auto cf = storage->GetCFHandle(ColumnFamilyID::Propagate);
 
   for (size_t i = 1; i <= lua_objlen(lua, -1); ++i) {
     lua_rawgeti(lua, -1, static_cast<int>(i));
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index b25ae8a8..a25eb9c7 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -236,11 +236,12 @@ Status Storage::CreateColumnFamilies(const 
rocksdb::Options &options) {
   rocksdb::ColumnFamilyOptions cf_options(options);
   auto res = util::DBOpen(options, config_->db_dir);
   if (res) {
-    std::vector<std::string> cf_names = {kMetadataColumnFamilyName, 
kZSetScoreColumnFamilyName,
-                                         kPubSubColumnFamilyName,   
kPropagateColumnFamilyName,
-                                         kStreamColumnFamilyName,   
kSearchColumnFamilyName};
+    std::vector<std::string> cf_names_except_default;
+    for (const auto &cf : 
ColumnFamilyConfigs::ListColumnFamiliesWithoutDefault()) {
+      cf_names_except_default.emplace_back(cf.Name());
+    }
     std::vector<rocksdb::ColumnFamilyHandle *> cf_handles;
-    auto s = (*res)->CreateColumnFamilies(cf_options, cf_names, &cf_handles);
+    auto s = (*res)->CreateColumnFamilies(cf_options, cf_names_except_default, 
&cf_handles);
     if (!s.ok()) {
       return {Status::DBOpenErr, s.ToString()};
     }
@@ -307,7 +308,7 @@ Status Storage::Open(DBOpenMode mode) {
   metadata_opts.memtable_whole_key_filtering = true;
   metadata_opts.memtable_prefix_bloom_size_ratio = 0.1;
   metadata_opts.table_properties_collector_factories.emplace_back(
-      NewCompactOnExpiredTableCollectorFactory(kMetadataColumnFamilyName, 
0.3));
+      
NewCompactOnExpiredTableCollectorFactory(std::string(kMetadataColumnFamilyName),
 0.3));
   SetBlobDB(&metadata_opts);
 
   rocksdb::BlockBasedTableOptions subkey_table_opts = InitTableOptions();
@@ -320,7 +321,7 @@ Status Storage::Open(DBOpenMode mode) {
   subkey_opts.compaction_filter_factory = 
std::make_shared<SubKeyFilterFactory>(this);
   subkey_opts.disable_auto_compactions = 
config_->rocks_db.disable_auto_compactions;
   subkey_opts.table_properties_collector_factories.emplace_back(
-      NewCompactOnExpiredTableCollectorFactory(kSubkeyColumnFamilyName, 0.3));
+      
NewCompactOnExpiredTableCollectorFactory(std::string(kPrimarySubkeyColumnFamilyName),
 0.3));
   SetBlobDB(&subkey_opts);
 
   rocksdb::BlockBasedTableOptions pubsub_table_opts = InitTableOptions();
@@ -340,12 +341,12 @@ Status Storage::Open(DBOpenMode mode) {
   std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
   // Caution: don't change the order of column family, or the handle will be 
mismatched
   column_families.emplace_back(rocksdb::kDefaultColumnFamilyName, subkey_opts);
-  column_families.emplace_back(kMetadataColumnFamilyName, metadata_opts);
-  column_families.emplace_back(kZSetScoreColumnFamilyName, subkey_opts);
-  column_families.emplace_back(kPubSubColumnFamilyName, pubsub_opts);
-  column_families.emplace_back(kPropagateColumnFamilyName, propagate_opts);
-  column_families.emplace_back(kStreamColumnFamilyName, subkey_opts);
-  column_families.emplace_back(kSearchColumnFamilyName, subkey_opts);
+  column_families.emplace_back(std::string(kMetadataColumnFamilyName), 
metadata_opts);
+  column_families.emplace_back(std::string(kSecondarySubkeyColumnFamilyName), 
subkey_opts);
+  column_families.emplace_back(std::string(kPubSubColumnFamilyName), 
pubsub_opts);
+  column_families.emplace_back(std::string(kPropagateColumnFamilyName), 
propagate_opts);
+  column_families.emplace_back(std::string(kStreamColumnFamilyName), 
subkey_opts);
+  column_families.emplace_back(std::string(kSearchColumnFamilyName), 
subkey_opts);
 
   std::vector<std::string> old_column_families;
   auto s = rocksdb::DB::ListColumnFamilies(options, config_->db_dir, 
&old_column_families);
@@ -523,7 +524,7 @@ Status Storage::RestoreFromCheckpoint() {
 
 bool Storage::IsEmptyDB() {
   std::unique_ptr<rocksdb::Iterator> iter(
-      db_->NewIterator(DefaultScanOptions(), 
GetCFHandle(kMetadataColumnFamilyName)));
+      db_->NewIterator(DefaultScanOptions(), 
GetCFHandle(ColumnFamilyID::Metadata)));
   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
     Metadata metadata(kRedisNone, false);
     // If cannot decode the metadata we think the key is alive, so the db is 
not empty
@@ -678,7 +679,7 @@ rocksdb::Status Storage::Delete(const rocksdb::WriteOptions 
&options, rocksdb::C
 
 rocksdb::Status Storage::DeleteRange(const std::string &first_key, const 
std::string &last_key) {
   auto batch = GetWriteBatchBase();
-  rocksdb::ColumnFamilyHandle *cf_handle = 
GetCFHandle(kMetadataColumnFamilyName);
+  rocksdb::ColumnFamilyHandle *cf_handle = 
GetCFHandle(ColumnFamilyID::Metadata);
   auto s = batch->DeleteRange(cf_handle, first_key, last_key);
   if (!s.ok()) {
     return s;
@@ -740,23 +741,6 @@ void Storage::RecordStat(StatType type, uint64_t v) {
   }
 }
 
-rocksdb::ColumnFamilyHandle *Storage::GetCFHandle(const std::string &name) {
-  if (name == kMetadataColumnFamilyName) {
-    return cf_handles_[1];
-  } else if (name == kZSetScoreColumnFamilyName) {
-    return cf_handles_[2];
-  } else if (name == kPubSubColumnFamilyName) {
-    return cf_handles_[3];
-  } else if (name == kPropagateColumnFamilyName) {
-    return cf_handles_[4];
-  } else if (name == kStreamColumnFamilyName) {
-    return cf_handles_[5];
-  } else if (name == kSearchColumnFamilyName) {
-    return cf_handles_[6];
-  }
-  return cf_handles_[0];
-}
-
 rocksdb::ColumnFamilyHandle *Storage::GetCFHandle(ColumnFamilyID id) { return 
cf_handles_[static_cast<size_t>(id)]; }
 
 rocksdb::Status Storage::Compact(rocksdb::ColumnFamilyHandle *cf, const Slice 
*begin, const Slice *end) {
@@ -787,7 +771,7 @@ uint64_t Storage::GetTotalSize(const std::string &ns) {
       rocksdb::DB::SizeApproximationFlags::INCLUDE_FILES | 
rocksdb::DB::SizeApproximationFlags::INCLUDE_MEMTABLES;
 
   for (auto cf_handle : cf_handles_) {
-    if (cf_handle == GetCFHandle(kPubSubColumnFamilyName) || cf_handle == 
GetCFHandle(kPropagateColumnFamilyName)) {
+    if (cf_handle == GetCFHandle(ColumnFamilyID::PubSub) || cf_handle == 
GetCFHandle(ColumnFamilyID::Propagate)) {
       continue;
     }
 
@@ -868,7 +852,7 @@ Status Storage::WriteToPropagateCF(const std::string &key, 
const std::string &va
     return {Status::NotOK, "cannot write to propagate column family in slave 
mode"};
   }
   auto batch = GetWriteBatchBase();
-  auto cf = GetCFHandle(kPropagateColumnFamilyName);
+  auto cf = GetCFHandle(ColumnFamilyID::Propagate);
   batch->Put(cf, key, value);
   auto s = Write(default_write_opts_, batch->GetWriteBatch());
   if (!s.ok()) {
@@ -945,7 +929,7 @@ std::string 
Storage::GetReplIdFromWalBySeq(rocksdb::SequenceNumber seq) {
 
 std::string Storage::GetReplIdFromDbEngine() {
   std::string replid_in_db;
-  auto cf = GetCFHandle(kPropagateColumnFamilyName);
+  auto cf = GetCFHandle(ColumnFamilyID::Propagate);
   auto s = db_->Get(rocksdb::ReadOptions(), cf, kReplicationIdKey, 
&replid_in_db);
   return replid_in_db;
 }
diff --git a/src/storage/storage.h b/src/storage/storage.h
index 09151baa..579ccb72 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -51,31 +51,25 @@ inline constexpr StorageEngineType STORAGE_ENGINE_TYPE = 
StorageEngineType::KVRO
 
 const int kReplIdLength = 16;
 
-enum ColumnFamilyID {
-  kColumnFamilyIDDefault = 0,
-  kColumnFamilyIDMetadata,
-  kColumnFamilyIDZSetScore,
-  kColumnFamilyIDPubSub,
-  kColumnFamilyIDPropagate,
-  kColumnFamilyIDStream,
-  kColumnFamilyIDSearch,
-};
-
 enum DBOpenMode {
   kDBOpenModeDefault,
   kDBOpenModeForReadOnly,
   kDBOpenModeAsSecondaryInstance,
 };
 
-namespace engine {
+enum class ColumnFamilyID : uint32_t {
+  PrimarySubkey = 0,
+  Metadata,
+  SecondarySubkey,
+  PubSub,
+  Propagate,
+  Stream,
+  Search,
+};
 
-constexpr const char *kPubSubColumnFamilyName = "pubsub";
-constexpr const char *kZSetScoreColumnFamilyName = "zset_score";
-constexpr const char *kMetadataColumnFamilyName = "metadata";
-constexpr const char *kSubkeyColumnFamilyName = "default";
-constexpr const char *kPropagateColumnFamilyName = "propagate";
-constexpr const char *kStreamColumnFamilyName = "stream";
-constexpr const char *kSearchColumnFamilyName = "search";
+constexpr uint32_t kMaxColumnFamilyID = 
static_cast<uint32_t>(ColumnFamilyID::Search);
+
+namespace engine {
 
 constexpr const char *kPropagateScriptCommand = "script";
 
@@ -122,6 +116,84 @@ struct DBStats {
   alignas(CACHE_LINE_SIZE) std::atomic<uint_fast64_t> keyspace_misses = 0;
 };
 
+class ColumnFamilyConfig {
+ public:
+  ColumnFamilyConfig(ColumnFamilyID id, std::string_view name, bool is_minor)
+      : id_(id), name_(name), is_minor_(is_minor) {}
+  ColumnFamilyID Id() const { return id_; }
+  std::string_view Name() const { return name_; }
+  bool IsMinor() const { return is_minor_; }
+
+ private:
+  ColumnFamilyID id_;
+  std::string_view name_;
+  bool is_minor_;
+};
+
+constexpr const std::string_view kPrimarySubkeyColumnFamilyName = "default";
+constexpr const std::string_view kMetadataColumnFamilyName = "metadata";
+constexpr const std::string_view kSecondarySubkeyColumnFamilyName = 
"zset_score";
+constexpr const std::string_view kPubSubColumnFamilyName = "pubsub";
+constexpr const std::string_view kPropagateColumnFamilyName = "propagate";
+constexpr const std::string_view kStreamColumnFamilyName = "stream";
+constexpr const std::string_view kSearchColumnFamilyName = "search";
+
+class ColumnFamilyConfigs {
+ public:
+  /// DefaultSubkeyColumnFamily is the default column family in rocksdb.
+  /// In kvrocks, we use it to store the data if metadata is not enough.
+  static ColumnFamilyConfig PrimarySubkeyColumnFamily() {
+    return {ColumnFamilyID::PrimarySubkey, kPrimarySubkeyColumnFamilyName, 
/*is_minor=*/false};
+  }
+
+  /// MetadataColumnFamily stores the metadata of data-structures.
+  static ColumnFamilyConfig MetadataColumnFamily() {
+    return {ColumnFamilyID::Metadata, kMetadataColumnFamilyName, 
/*is_minor=*/false};
+  }
+
+  /// SecondarySubkeyColumnFamily stores the score of zset or other secondary 
subkey.
+  /// See https://kvrocks.apache.org/community/data-structure-on-rocksdb#zset 
for more details.
+  static ColumnFamilyConfig SecondarySubkeyColumnFamily() {
+    return {ColumnFamilyID::SecondarySubkey, kSecondarySubkeyColumnFamilyName,
+            /*is_minor=*/true};
+  }
+
+  /// PubSubColumnFamily stores the pubsub data.
+  static ColumnFamilyConfig PubSubColumnFamily() {
+    return {ColumnFamilyID::PubSub, kPubSubColumnFamilyName, 
/*is_minor=*/true};
+  }
+
+  static ColumnFamilyConfig PropagateColumnFamily() {
+    return {ColumnFamilyID::Propagate, kPropagateColumnFamilyName, 
/*is_minor=*/true};
+  }
+
+  static ColumnFamilyConfig StreamColumnFamily() {
+    return {ColumnFamilyID::Stream, kStreamColumnFamilyName, 
/*is_minor=*/true};
+  }
+
+  static ColumnFamilyConfig SearchColumnFamily() {
+    return {ColumnFamilyID::Search, kSearchColumnFamilyName, 
/*is_minor=*/true};
+  }
+
+  /// ListAllColumnFamilies returns all column families in kvrocks.
+  static const std::vector<ColumnFamilyConfig> &ListAllColumnFamilies() { 
return AllCfs; }
+
+  static const std::vector<ColumnFamilyConfig> 
&ListColumnFamiliesWithoutDefault() { return AllCfsWithoutDefault; }
+
+  static const ColumnFamilyConfig &GetColumnFamily(ColumnFamilyID id) { return 
AllCfs[static_cast<size_t>(id)]; }
+
+ private:
+  // Caution: don't change the order of column family, or the handle will be 
mismatched
+  inline const static std::vector<ColumnFamilyConfig> AllCfs = {
+      PrimarySubkeyColumnFamily(), MetadataColumnFamily(), 
SecondarySubkeyColumnFamily(), PubSubColumnFamily(),
+      PropagateColumnFamily(),     StreamColumnFamily(),   
SearchColumnFamily(),
+  };
+  inline const static std::vector<ColumnFamilyConfig> AllCfsWithoutDefault = {
+      MetadataColumnFamily(),  SecondarySubkeyColumnFamily(), 
PubSubColumnFamily(),
+      PropagateColumnFamily(), StreamColumnFamily(),          
SearchColumnFamily(),
+  };
+};
+
 class Storage {
  public:
   explicit Storage(Config *config);
@@ -180,7 +252,7 @@ class Storage {
   rocksdb::DB *GetDB();
   bool IsClosing() const { return db_closing_; }
   std::string GetName() const { return config_->db_name; }
-  rocksdb::ColumnFamilyHandle *GetCFHandle(const std::string &name);
+  /// Get the column family handle by the column family id.
   rocksdb::ColumnFamilyHandle *GetCFHandle(ColumnFamilyID id);
   std::vector<rocksdb::ColumnFamilyHandle *> *GetCFHandles() { return 
&cf_handles_; }
   LockManager *GetLockManager() { return &lock_mgr_; }
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index 2e867a85..e0546ded 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -36,7 +36,7 @@ namespace redis {
 class Stream : public SubKeyScanner {
  public:
   explicit Stream(engine::Storage *storage, const std::string &ns)
-      : SubKeyScanner(storage, ns), 
stream_cf_handle_(storage->GetCFHandle("stream")) {}
+      : SubKeyScanner(storage, ns), 
stream_cf_handle_(storage->GetCFHandle(ColumnFamilyID::Stream)) {}
   rocksdb::Status Add(const Slice &stream_name, const StreamAddOptions 
&options, const std::vector<std::string> &values,
                       StreamEntryID *id);
   rocksdb::Status CreateGroup(const Slice &stream_name, const 
StreamXGroupCreateOptions &options,
diff --git a/src/types/redis_string.cc b/src/types/redis_string.cc
index 0f0d4688..b934f3b8 100644
--- a/src/types/redis_string.cc
+++ b/src/types/redis_string.cc
@@ -503,8 +503,8 @@ rocksdb::Status String::CAD(const std::string &user_key, 
const std::string &valu
   }
 
   if (value == current_value) {
-    auto delete_status = storage_->Delete(storage_->DefaultWriteOptions(),
-                                          
storage_->GetCFHandle(engine::kMetadataColumnFamilyName), ns_key);
+    auto delete_status =
+        storage_->Delete(storage_->DefaultWriteOptions(), 
storage_->GetCFHandle(ColumnFamilyID::Metadata), ns_key);
     if (!delete_status.ok()) {
       return delete_status;
     }
diff --git a/src/types/redis_zset.h b/src/types/redis_zset.h
index 0c52c5fa..a5ea754a 100644
--- a/src/types/redis_zset.h
+++ b/src/types/redis_zset.h
@@ -90,7 +90,7 @@ namespace redis {
 class ZSet : public SubKeyScanner {
  public:
   explicit ZSet(engine::Storage *storage, const std::string &ns)
-      : SubKeyScanner(storage, ns), 
score_cf_handle_(storage->GetCFHandle(engine::kZSetScoreColumnFamilyName)) {}
+      : SubKeyScanner(storage, ns), 
score_cf_handle_(storage->GetCFHandle(ColumnFamilyID::SecondarySubkey)) {}
 
   using Members = std::vector<std::string>;
   using MemberScores = std::vector<MemberScore>;
diff --git a/tests/cppunit/compact_test.cc b/tests/cppunit/compact_test.cc
index c5616304..fad4cb13 100644
--- a/tests/cppunit/compact_test.cc
+++ b/tests/cppunit/compact_test.cc
@@ -60,17 +60,17 @@ TEST(Compact, Filter) {
   read_options.snapshot = db->GetSnapshot();
   read_options.fill_cache = false;
 
-  auto new_iterator = [db, read_options, &storage](const std::string& name) {
-    return std::unique_ptr<rocksdb::Iterator>(db->NewIterator(read_options, 
storage->GetCFHandle(name)));
+  auto new_iterator = [db, read_options, &storage](ColumnFamilyID 
column_family_id) {
+    return std::unique_ptr<rocksdb::Iterator>(db->NewIterator(read_options, 
storage->GetCFHandle(column_family_id)));
   };
 
-  auto iter = new_iterator("metadata");
+  auto iter = new_iterator(ColumnFamilyID::Metadata);
   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
     auto [user_ns, user_key] = ExtractNamespaceKey(iter->key(), 
storage->IsSlotIdEncoded());
     EXPECT_EQ(user_key.ToString(), live_hash_key);
   }
 
-  iter = new_iterator("subkey");
+  iter = new_iterator(ColumnFamilyID::PrimarySubkey);
   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
     InternalKey ikey(iter->key(), storage->IsSlotIdEncoded());
     EXPECT_EQ(ikey.GetKey().ToString(), live_hash_key);
@@ -85,17 +85,17 @@ TEST(Compact, Filter) {
 
   // Same as the above compact, need to compact twice here
   status = storage->Compact(nullptr, nullptr, nullptr);
-  assert(status.ok());
+  EXPECT_TRUE(status.ok());
   status = storage->Compact(nullptr, nullptr, nullptr);
-  assert(status.ok());
+  EXPECT_TRUE(status.ok());
 
-  iter = new_iterator("default");
+  iter = new_iterator(ColumnFamilyID::PrimarySubkey);
   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
     InternalKey ikey(iter->key(), storage->IsSlotIdEncoded());
     EXPECT_EQ(ikey.GetKey().ToString(), live_hash_key);
   }
 
-  iter = new_iterator("zset_score");
+  iter = new_iterator(ColumnFamilyID::SecondarySubkey);
   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
     EXPECT_TRUE(false);  // never reach here
   }
@@ -107,7 +107,7 @@ TEST(Compact, Filter) {
   int retry = 2;
   while (retry-- > 0) {
     status = storage->Compact(nullptr, nullptr, nullptr);
-    assert(status.ok());
+    ASSERT_TRUE(status.ok());
     std::vector<FieldValue> fieldvalues;
     auto get_res = hash->GetAll(mk_with_ttl, &fieldvalues);
     auto s_expire = hash->Expire(mk_with_ttl, 1);  // expired immediately..
diff --git a/tests/cppunit/indexer_test.cc b/tests/cppunit/indexer_test.cc
index ced039e0..4ab41b3a 100644
--- a/tests/cppunit/indexer_test.cc
+++ b/tests/cppunit/indexer_test.cc
@@ -67,7 +67,7 @@ struct IndexerTest : TestBase {
 
 TEST_F(IndexerTest, HashTag) {
   redis::Hash db(storage_.get(), ns);
-  auto cfhandler = storage_->GetCFHandle("search");
+  auto cfhandler = storage_->GetCFHandle(ColumnFamilyID::Search);
 
   {
     auto s = indexer.Record("no_exist", ns);
@@ -174,7 +174,7 @@ TEST_F(IndexerTest, HashTag) {
 
 TEST_F(IndexerTest, JsonTag) {
   redis::Json db(storage_.get(), ns);
-  auto cfhandler = storage_->GetCFHandle("search");
+  auto cfhandler = storage_->GetCFHandle(ColumnFamilyID::Search);
 
   {
     auto s = indexer.Record("no_exist", ns);
diff --git a/tests/cppunit/iterator_test.cc b/tests/cppunit/iterator_test.cc
index 71ebec45..08705d77 100644
--- a/tests/cppunit/iterator_test.cc
+++ b/tests/cppunit/iterator_test.cc
@@ -451,10 +451,10 @@ TEST_F(WALIteratorTest, BasicHash) {
     auto item = iter.Item();
     switch (item.type) {
       case engine::WALItem::Type::kTypePut: {
-        if (item.column_family_id == kColumnFamilyIDDefault) {
+        if (item.column_family_id == 
static_cast<uint32_t>(ColumnFamilyID::PrimarySubkey)) {
           InternalKey internal_key(item.key, storage_->IsSlotIdEncoded());
           put_fields.emplace_back(internal_key.GetSubKey().ToString());
-        } else if (item.column_family_id == kColumnFamilyIDMetadata) {
+        } else if (item.column_family_id == 
static_cast<uint32_t>(ColumnFamilyID::Metadata)) {
           auto [ns, key] = ExtractNamespaceKey(item.key, 
storage_->IsSlotIdEncoded());
           ASSERT_EQ(ns.ToString(), "test_ns1");
           put_keys.emplace_back(key.ToString());
@@ -504,10 +504,10 @@ TEST_F(WALIteratorTest, BasicSet) {
     auto item = iter.Item();
     switch (item.type) {
       case engine::WALItem::Type::kTypePut: {
-        if (item.column_family_id == kColumnFamilyIDDefault) {
+        if (item.column_family_id == 
static_cast<uint32_t>(ColumnFamilyID::PrimarySubkey)) {
           InternalKey internal_key(item.key, storage_->IsSlotIdEncoded());
           put_members.emplace_back(internal_key.GetSubKey().ToString());
-        } else if (item.column_family_id == kColumnFamilyIDMetadata) {
+        } else if (item.column_family_id == 
static_cast<uint32_t>(ColumnFamilyID::Metadata)) {
           auto [ns, key] = ExtractNamespaceKey(item.key, 
storage_->IsSlotIdEncoded());
           ASSERT_EQ(ns.ToString(), "test_ns2");
           put_keys.emplace_back(key.ToString());
@@ -559,10 +559,10 @@ TEST_F(WALIteratorTest, BasicZSet) {
     auto item = iter.Item();
     switch (item.type) {
       case engine::WALItem::Type::kTypePut: {
-        if (item.column_family_id == kColumnFamilyIDDefault) {
+        if (item.column_family_id == 
static_cast<uint32_t>(ColumnFamilyID::PrimarySubkey)) {
           InternalKey internal_key(item.key, storage_->IsSlotIdEncoded());
           put_members.emplace_back(internal_key.GetSubKey().ToString());
-        } else if (item.column_family_id == kColumnFamilyIDMetadata) {
+        } else if (item.column_family_id == 
static_cast<uint32_t>(ColumnFamilyID::Metadata)) {
           auto [ns, key] = ExtractNamespaceKey(item.key, 
storage_->IsSlotIdEncoded());
           ASSERT_EQ(ns.ToString(), "test_ns3");
           put_keys.emplace_back(key.ToString());
@@ -609,9 +609,9 @@ TEST_F(WALIteratorTest, BasicList) {
     auto item = iter.Item();
     switch (item.type) {
       case engine::WALItem::Type::kTypePut: {
-        if (item.column_family_id == kColumnFamilyIDDefault) {
+        if (item.column_family_id == 
static_cast<uint32_t>(ColumnFamilyID::PrimarySubkey)) {
           put_values.emplace_back(item.value);
-        } else if (item.column_family_id == kColumnFamilyIDMetadata) {
+        } else if (item.column_family_id == 
static_cast<uint32_t>(ColumnFamilyID::Metadata)) {
           auto [ns, key] = ExtractNamespaceKey(item.key, 
storage_->IsSlotIdEncoded());
           ASSERT_EQ(ns.ToString(), "test_ns4");
           put_keys.emplace_back(key.ToString());
@@ -663,12 +663,12 @@ TEST_F(WALIteratorTest, BasicStream) {
     auto item = iter.Item();
     switch (item.type) {
       case engine::WALItem::Type::kTypePut: {
-        if (item.column_family_id == kColumnFamilyIDStream) {
+        if (item.column_family_id == 
static_cast<uint32_t>(ColumnFamilyID::Stream)) {
           std::vector<std::string> elems;
           auto s = redis::DecodeRawStreamEntryValue(item.value, &elems);
           ASSERT_TRUE(s.IsOK() && !elems.empty());
           put_values.emplace_back(elems[0]);
-        } else if (item.column_family_id == kColumnFamilyIDMetadata) {
+        } else if (item.column_family_id == 
static_cast<uint32_t>(ColumnFamilyID::Metadata)) {
           auto [ns, key] = ExtractNamespaceKey(item.key, 
storage_->IsSlotIdEncoded());
           ASSERT_EQ(ns.ToString(), "test_ns5");
           put_keys.emplace_back(key.ToString());
@@ -715,7 +715,7 @@ TEST_F(WALIteratorTest, BasicBitmap) {
     auto item = iter.Item();
     switch (item.type) {
       case engine::WALItem::Type::kTypePut: {
-        if (item.column_family_id == kColumnFamilyIDDefault) {
+        if (item.column_family_id == 
static_cast<uint32_t>(ColumnFamilyID::PrimarySubkey)) {
           put_values.emplace_back(item.value);
         }
         break;
@@ -754,7 +754,7 @@ TEST_F(WALIteratorTest, BasicJSON) {
     auto item = iter.Item();
     switch (item.type) {
       case engine::WALItem::Type::kTypePut: {
-        ASSERT_EQ(item.column_family_id, kColumnFamilyIDMetadata);
+        ASSERT_EQ(item.column_family_id, 
static_cast<uint32_t>(ColumnFamilyID::Metadata));
         auto [ns, key] = ExtractNamespaceKey(item.key, 
storage_->IsSlotIdEncoded());
         ASSERT_EQ(ns.ToString(), "test_ns7");
         put_keys.emplace_back(key.ToString());
@@ -767,7 +767,7 @@ TEST_F(WALIteratorTest, BasicJSON) {
         break;
       }
       case engine::WALItem::Type::kTypeDelete: {
-        ASSERT_EQ(item.column_family_id, kColumnFamilyIDMetadata);
+        ASSERT_EQ(item.column_family_id, 
static_cast<uint32_t>(ColumnFamilyID::Metadata));
         auto [ns, key] = ExtractNamespaceKey(item.key, 
storage_->IsSlotIdEncoded());
         ASSERT_EQ(ns.ToString(), "test_ns7");
         delete_keys.emplace_back(key.ToString());
@@ -801,7 +801,7 @@ TEST_F(WALIteratorTest, BasicSortedInt) {
     auto item = iter.Item();
     switch (item.type) {
       case engine::WALItem::Type::kTypePut: {
-        if (item.column_family_id == kColumnFamilyIDDefault) {
+        if (item.column_family_id == 
static_cast<uint32_t>(ColumnFamilyID::PrimarySubkey)) {
           const InternalKey internal_key(item.key, 
storage_->IsSlotIdEncoded());
           auto value = DecodeFixed64(internal_key.GetSubKey().data());
           put_values.emplace_back(value);
diff --git a/utils/kvrocks2redis/parser.cc b/utils/kvrocks2redis/parser.cc
index 9d4db5ec..c131f146 100644
--- a/utils/kvrocks2redis/parser.cc
+++ b/utils/kvrocks2redis/parser.cc
@@ -33,7 +33,7 @@
 
 Status Parser::ParseFullDB() {
   rocksdb::DB *db = storage_->GetDB();
-  rocksdb::ColumnFamilyHandle *metadata_cf_handle = 
storage_->GetCFHandle(engine::kMetadataColumnFamilyName);
+  rocksdb::ColumnFamilyHandle *metadata_cf_handle = 
storage_->GetCFHandle(ColumnFamilyID::Metadata);
   // Due to RSI(Rocksdb Secondary Instance) not supporting "Snapshots based 
read", we don't need to set the snapshot
   // parameter. However, until we proactively invoke TryCatchUpWithPrimary, 
this replica is read-only, which can be
   // considered as a snapshot.

Reply via email to