This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 90e05f19 Refactor Column Family Handling (#2296)
90e05f19 is described below
commit 90e05f19a20cf8a852dd51059a7f5fbc23d6fb76
Author: mwish <[email protected]>
AuthorDate: Tue May 14 19:34:57 2024 +0800
Refactor Column Family Handling (#2296)
---
src/cluster/replication.cc | 7 +-
src/cluster/slot_migrate.cc | 18 +++-
src/search/executors/full_index_scan_executor.h | 3 +-
src/search/executors/numeric_field_scan_executor.h | 3 +-
src/search/executors/tag_field_scan_executor.h | 3 +-
src/search/indexer.cc | 4 +-
src/server/namespace.h | 5 +-
src/server/server.cc | 17 ++--
src/stats/disk_stats.cc | 16 +--
src/storage/batch_extractor.cc | 16 +--
src/storage/compaction_checker.cc | 15 +--
src/storage/compaction_checker.h | 2 +-
src/storage/iterator.cc | 10 +-
src/storage/redis_db.cc | 9 +-
src/storage/redis_pubsub.h | 3 +-
src/storage/scripting.cc | 6 +-
src/storage/storage.cc | 52 ++++------
src/storage/storage.h | 110 +++++++++++++++++----
src/types/redis_stream.h | 2 +-
src/types/redis_string.cc | 4 +-
src/types/redis_zset.h | 2 +-
tests/cppunit/compact_test.cc | 18 ++--
tests/cppunit/indexer_test.cc | 4 +-
tests/cppunit/iterator_test.cc | 28 +++---
utils/kvrocks2redis/parser.cc | 2 +-
25 files changed, 212 insertions(+), 147 deletions(-)
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index 57d8b9bc..ea9c5587 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -1011,17 +1011,18 @@ bool ReplicationThread::isUnknownOption(const char
*err) { return std::string(er
rocksdb::Status WriteBatchHandler::PutCF(uint32_t column_family_id, const
rocksdb::Slice &key,
const rocksdb::Slice &value) {
type_ = kBatchTypeNone;
- if (column_family_id == kColumnFamilyIDPubSub) {
+ if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::PubSub)) {
type_ = kBatchTypePublish;
kv_ = std::make_pair(key.ToString(), value.ToString());
return rocksdb::Status::OK();
- } else if (column_family_id == kColumnFamilyIDPropagate) {
+ } else if (column_family_id ==
static_cast<uint32_t>(ColumnFamilyID::Propagate)) {
type_ = kBatchTypePropagate;
kv_ = std::make_pair(key.ToString(), value.ToString());
return rocksdb::Status::OK();
- } else if (column_family_id == kColumnFamilyIDStream) {
+ } else if (column_family_id ==
static_cast<uint32_t>(ColumnFamilyID::Stream)) {
type_ = kBatchTypeStream;
kv_ = std::make_pair(key.ToString(), value.ToString());
+ return rocksdb::Status::OK();
}
return rocksdb::Status::OK();
}
diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc
index a9c466a4..a074d536 100644
--- a/src/cluster/slot_migrate.cc
+++ b/src/cluster/slot_migrate.cc
@@ -342,7 +342,7 @@ Status SlotMigrator::sendSnapshotByCmd() {
read_options.snapshot = slot_snapshot_;
Slice prefix_slice(prefix);
read_options.iterate_lower_bound = &prefix_slice;
- rocksdb::ColumnFamilyHandle *cf_handle =
storage_->GetCFHandle(engine::kMetadataColumnFamilyName);
+ rocksdb::ColumnFamilyHandle *cf_handle =
storage_->GetCFHandle(ColumnFamilyID::Metadata);
auto iter =
util::UniqueIterator(storage_->GetDB()->NewIterator(read_options, cf_handle));
// Seek to the beginning of keys start with 'prefix' and iterate all these
keys
@@ -851,8 +851,8 @@ Status SlotMigrator::migrateStream(const Slice &key, const
StreamMetadata &metad
read_options.iterate_lower_bound = &prefix_key_slice;
// Should use th raw db iterator to avoid reading uncommitted writes in
transaction mode
- auto iter = util::UniqueIterator(
- storage_->GetDB()->NewIterator(read_options,
storage_->GetCFHandle(engine::kStreamColumnFamilyName)));
+ auto iter =
+ util::UniqueIterator(storage_->GetDB()->NewIterator(read_options,
storage_->GetCFHandle(ColumnFamilyID::Stream)));
std::vector<std::string> user_cmd = {type_to_cmd[metadata.Type()],
key.ToString()};
@@ -1224,7 +1224,7 @@ Status SlotMigrator::sendSnapshotByRawKV() {
}
batch_sender.SetPrefixLogData(log_data);
-
GET_OR_RET(batch_sender.Put(storage_->GetCFHandle(engine::kMetadataColumnFamilyName),
iter.Key(), iter.Value()));
+
GET_OR_RET(batch_sender.Put(storage_->GetCFHandle(ColumnFamilyID::Metadata),
iter.Key(), iter.Value()));
auto subkey_iter = iter.GetSubKeyIterator();
if (!subkey_iter) {
@@ -1240,7 +1240,7 @@ Status SlotMigrator::sendSnapshotByRawKV() {
score_key.append(subkey_iter->UserKey().ToString());
auto score_key_bytes =
InternalKey(iter.Key(), score_key, internal_key.GetVersion(),
storage_->IsSlotIdEncoded()).Encode();
-
GET_OR_RET(batch_sender.Put(storage_->GetCFHandle(kColumnFamilyIDZSetScore),
score_key_bytes, Slice()));
+
GET_OR_RET(batch_sender.Put(storage_->GetCFHandle(ColumnFamilyID::SecondarySubkey),
score_key_bytes, Slice()));
}
if (batch_sender.IsFull()) {
@@ -1334,11 +1334,19 @@ Status
SlotMigrator::migrateIncrementalDataByRawKV(uint64_t end_seq, BatchSender
break;
}
case engine::WALItem::Type::kTypePut: {
+ if (item.column_family_id > kMaxColumnFamilyID) {
+ LOG(INFO) << fmt::format("[migrate] Invalid put column family id:
{}", item.column_family_id);
+ continue;
+ }
GET_OR_RET(batch_sender->Put(storage_->GetCFHandle(static_cast<ColumnFamilyID>(item.column_family_id)),
item.key, item.value));
break;
}
case engine::WALItem::Type::kTypeDelete: {
+ if (item.column_family_id > kMaxColumnFamilyID) {
+ LOG(INFO) << fmt::format("[migrate] Invalid delete column family id:
{}", item.column_family_id);
+ continue;
+ }
GET_OR_RET(
batch_sender->Delete(storage_->GetCFHandle(static_cast<ColumnFamilyID>(item.column_family_id)),
item.key));
break;
diff --git a/src/search/executors/full_index_scan_executor.h
b/src/search/executors/full_index_scan_executor.h
index 0afeae04..3fde9ef5 100644
--- a/src/search/executors/full_index_scan_executor.h
+++ b/src/search/executors/full_index_scan_executor.h
@@ -50,8 +50,7 @@ struct FullIndexScanExecutor : ExecutorNode {
if (!iter) {
rocksdb::ReadOptions read_options = ctx->storage->DefaultScanOptions();
read_options.snapshot = ss.GetSnapShot();
- iter = util::UniqueIterator(ctx->storage, read_options,
-
ctx->storage->GetCFHandle(engine::kMetadataColumnFamilyName));
+ iter = util::UniqueIterator(ctx->storage, read_options,
ctx->storage->GetCFHandle(ColumnFamilyID::Metadata));
iter->Seek(ns_key);
}
diff --git a/src/search/executors/numeric_field_scan_executor.h
b/src/search/executors/numeric_field_scan_executor.h
index 1609f433..970ef106 100644
--- a/src/search/executors/numeric_field_scan_executor.h
+++ b/src/search/executors/numeric_field_scan_executor.h
@@ -76,8 +76,7 @@ struct NumericFieldScanExecutor : ExecutorNode {
rocksdb::ReadOptions read_options = ctx->storage->DefaultScanOptions();
read_options.snapshot = ss.GetSnapShot();
- iter =
- util::UniqueIterator(ctx->storage, read_options,
ctx->storage->GetCFHandle(engine::kSearchColumnFamilyName));
+ iter = util::UniqueIterator(ctx->storage, read_options,
ctx->storage->GetCFHandle(ColumnFamilyID::Search));
if (scan->order == SortByClause::ASC) {
iter->Seek(IndexKey(scan->range.l));
} else {
diff --git a/src/search/executors/tag_field_scan_executor.h
b/src/search/executors/tag_field_scan_executor.h
index a3781c11..ddedffc9 100644
--- a/src/search/executors/tag_field_scan_executor.h
+++ b/src/search/executors/tag_field_scan_executor.h
@@ -76,8 +76,7 @@ struct TagFieldScanExecutor : ExecutorNode {
rocksdb::ReadOptions read_options = ctx->storage->DefaultScanOptions();
read_options.snapshot = ss.GetSnapShot();
- iter =
- util::UniqueIterator(ctx->storage, read_options,
ctx->storage->GetCFHandle(engine::kSearchColumnFamilyName));
+ iter = util::UniqueIterator(ctx->storage, read_options,
ctx->storage->GetCFHandle(ColumnFamilyID::Search));
iter->Seek(index_key);
}
diff --git a/src/search/indexer.cc b/src/search/indexer.cc
index 4a4a949c..4bce72de 100644
--- a/src/search/indexer.cc
+++ b/src/search/indexer.cc
@@ -159,7 +159,7 @@ Status IndexUpdater::UpdateIndex(const std::string &field,
std::string_view key,
}
auto batch = storage->GetWriteBatchBase();
- auto cf_handle = storage->GetCFHandle(engine::kSearchColumnFamilyName);
+ auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search);
for (const auto &tag : tags_to_delete) {
auto sub_key = ConstructTagFieldSubkey(field, tag, key);
@@ -179,7 +179,7 @@ Status IndexUpdater::UpdateIndex(const std::string &field,
std::string_view key,
if (!s.ok()) return {Status::NotOK, s.ToString()};
} else if (auto numeric [[maybe_unused]] =
dynamic_cast<SearchNumericFieldMetadata *>(metadata)) {
auto batch = storage->GetWriteBatchBase();
- auto cf_handle = storage->GetCFHandle(engine::kSearchColumnFamilyName);
+ auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search);
if (!original.empty()) {
auto original_num = GET_OR_RET(ParseFloat(std::string(original.begin(),
original.end())));
diff --git a/src/server/namespace.h b/src/server/namespace.h
index 943a3ece..c0556eeb 100644
--- a/src/server/namespace.h
+++ b/src/server/namespace.h
@@ -26,9 +26,8 @@ constexpr const char *kNamespaceDBKey = "__namespace_keys__";
class Namespace {
public:
- explicit Namespace(engine::Storage *storage) : storage_(storage) {
- cf_ = storage_->GetCFHandle(engine::kPropagateColumnFamilyName);
- }
+ explicit Namespace(engine::Storage *storage)
+ : storage_(storage),
cf_(storage_->GetCFHandle(ColumnFamilyID::Propagate)) {}
~Namespace() = default;
Namespace(const Namespace &) = delete;
diff --git a/src/server/server.cc b/src/server/server.cc
index 7c50a008..0ebc142c 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -195,10 +195,9 @@ Status Server::Start() {
auto now_hours = util::GetTimeStamp<std::chrono::hours>();
if (now_hours >= config_->compaction_checker_range.start &&
now_hours <= config_->compaction_checker_range.stop) {
- std::vector<std::string> cf_names =
{engine::kMetadataColumnFamilyName, engine::kSubkeyColumnFamilyName,
-
engine::kZSetScoreColumnFamilyName, engine::kStreamColumnFamilyName};
- for (const auto &cf_name : cf_names) {
- compaction_checker.PickCompactionFiles(cf_name);
+ const auto &column_family_list =
engine::ColumnFamilyConfigs::ListAllColumnFamilies();
+ for (auto &column_family : column_family_list) {
+ compaction_checker.PickCompactionFilesForCf(column_family);
}
}
// compact once per day
@@ -850,7 +849,7 @@ void Server::GetRocksDBInfo(std::string *info) {
// All column families share the same block cache, so it's good to count a
single one.
uint64_t block_cache_usage = 0;
uint64_t block_cache_pinned_usage = 0;
- auto subkey_cf_handle =
storage->GetCFHandle(engine::kSubkeyColumnFamilyName);
+ auto subkey_cf_handle =
storage->GetCFHandle(ColumnFamilyID::PrimarySubkey);
db->GetIntProperty(subkey_cf_handle,
rocksdb::DB::Properties::kBlockCacheUsage, &block_cache_usage);
string_stream << "block_cache_usage:" << block_cache_usage << "\r\n";
db->GetIntProperty(subkey_cf_handle,
rocksdb::DB::Properties::kBlockCachePinnedUsage, &block_cache_pinned_usage);
@@ -1651,7 +1650,7 @@ Status Server::ScriptExists(const std::string &sha) {
Status Server::ScriptGet(const std::string &sha, std::string *body) const {
std::string func_name = engine::kLuaFuncSHAPrefix + sha;
- auto cf = storage->GetCFHandle(engine::kPropagateColumnFamilyName);
+ auto cf = storage->GetCFHandle(ColumnFamilyID::Propagate);
auto s = storage->Get(rocksdb::ReadOptions(), cf, func_name, body);
if (!s.ok()) {
return {s.IsNotFound() ? Status::NotFound : Status::NotOK, s.ToString()};
@@ -1666,7 +1665,7 @@ Status Server::ScriptSet(const std::string &sha, const
std::string &body) const
Status Server::FunctionGetCode(const std::string &lib, std::string *code)
const {
std::string func_name = engine::kLuaLibCodePrefix + lib;
- auto cf = storage->GetCFHandle(engine::kPropagateColumnFamilyName);
+ auto cf = storage->GetCFHandle(ColumnFamilyID::Propagate);
auto s = storage->Get(rocksdb::ReadOptions(), cf, func_name, code);
if (!s.ok()) {
return {s.IsNotFound() ? Status::NotFound : Status::NotOK, s.ToString()};
@@ -1676,7 +1675,7 @@ Status Server::FunctionGetCode(const std::string &lib,
std::string *code) const
Status Server::FunctionGetLib(const std::string &func, std::string *lib) const
{
std::string func_name = engine::kLuaFuncLibPrefix + func;
- auto cf = storage->GetCFHandle(engine::kPropagateColumnFamilyName);
+ auto cf = storage->GetCFHandle(ColumnFamilyID::Propagate);
auto s = storage->Get(rocksdb::ReadOptions(), cf, func_name, lib);
if (!s.ok()) {
return {s.IsNotFound() ? Status::NotFound : Status::NotOK, s.ToString()};
@@ -1700,7 +1699,7 @@ void Server::ScriptReset() {
}
Status Server::ScriptFlush() {
- auto cf = storage->GetCFHandle(engine::kPropagateColumnFamilyName);
+ auto cf = storage->GetCFHandle(ColumnFamilyID::Propagate);
auto s = storage->FlushScripts(storage->DefaultWriteOptions(), cf);
if (!s.ok()) return {Status::NotOK, s.ToString()};
ScriptReset();
diff --git a/src/stats/disk_stats.cc b/src/stats/disk_stats.cc
index 9eda0950..8c7f98dd 100644
--- a/src/stats/disk_stats.cc
+++ b/src/stats/disk_stats.cc
@@ -79,14 +79,14 @@ rocksdb::Status Disk::GetHashSize(const Slice &ns_key,
uint64_t *key_size) {
HashMetadata metadata(false);
rocksdb::Status s = Database::GetMetadata(Database::GetOptions{},
{kRedisHash}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
- return GetApproximateSizes(metadata, ns_key,
storage_->GetCFHandle(engine::kSubkeyColumnFamilyName), key_size);
+ return GetApproximateSizes(metadata, ns_key,
storage_->GetCFHandle(ColumnFamilyID::PrimarySubkey), key_size);
}
rocksdb::Status Disk::GetSetSize(const Slice &ns_key, uint64_t *key_size) {
SetMetadata metadata(false);
rocksdb::Status s = Database::GetMetadata(Database::GetOptions{},
{kRedisSet}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
- return GetApproximateSizes(metadata, ns_key,
storage_->GetCFHandle(engine::kSubkeyColumnFamilyName), key_size);
+ return GetApproximateSizes(metadata, ns_key,
storage_->GetCFHandle(ColumnFamilyID::PrimarySubkey), key_size);
}
rocksdb::Status Disk::GetListSize(const Slice &ns_key, uint64_t *key_size) {
@@ -95,7 +95,7 @@ rocksdb::Status Disk::GetListSize(const Slice &ns_key,
uint64_t *key_size) {
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string buf;
PutFixed64(&buf, metadata.head);
- return GetApproximateSizes(metadata, ns_key,
storage_->GetCFHandle(engine::kSubkeyColumnFamilyName), key_size, buf);
+ return GetApproximateSizes(metadata, ns_key,
storage_->GetCFHandle(ColumnFamilyID::PrimarySubkey), key_size, buf);
}
rocksdb::Status Disk::GetZsetSize(const Slice &ns_key, uint64_t *key_size) {
@@ -104,17 +104,17 @@ rocksdb::Status Disk::GetZsetSize(const Slice &ns_key,
uint64_t *key_size) {
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string score_bytes;
PutDouble(&score_bytes, kMinScore);
- s = GetApproximateSizes(metadata, ns_key,
storage_->GetCFHandle(engine::kZSetScoreColumnFamilyName), key_size,
+ s = GetApproximateSizes(metadata, ns_key,
storage_->GetCFHandle(ColumnFamilyID::SecondarySubkey), key_size,
score_bytes, score_bytes);
if (!s.ok()) return s;
- return GetApproximateSizes(metadata, ns_key,
storage_->GetCFHandle(engine::kSubkeyColumnFamilyName), key_size);
+ return GetApproximateSizes(metadata, ns_key,
storage_->GetCFHandle(ColumnFamilyID::PrimarySubkey), key_size);
}
rocksdb::Status Disk::GetBitmapSize(const Slice &ns_key, uint64_t *key_size) {
BitmapMetadata metadata(false);
rocksdb::Status s = Database::GetMetadata(Database::GetOptions{},
{kRedisBitmap}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
- return GetApproximateSizes(metadata, ns_key,
storage_->GetCFHandle(engine::kSubkeyColumnFamilyName), key_size,
+ return GetApproximateSizes(metadata, ns_key,
storage_->GetCFHandle(ColumnFamilyID::PrimarySubkey), key_size,
std::to_string(0), std::to_string(0));
}
@@ -124,7 +124,7 @@ rocksdb::Status Disk::GetSortedintSize(const Slice &ns_key,
uint64_t *key_size)
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string start_buf;
PutFixed64(&start_buf, 0);
- return GetApproximateSizes(metadata, ns_key,
storage_->GetCFHandle(engine::kSubkeyColumnFamilyName), key_size,
+ return GetApproximateSizes(metadata, ns_key,
storage_->GetCFHandle(ColumnFamilyID::PrimarySubkey), key_size,
start_buf, start_buf);
}
@@ -132,7 +132,7 @@ rocksdb::Status Disk::GetStreamSize(const Slice &ns_key,
uint64_t *key_size) {
StreamMetadata metadata(false);
rocksdb::Status s = Database::GetMetadata(Database::GetOptions{},
{kRedisStream}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
- return GetApproximateSizes(metadata, ns_key,
storage_->GetCFHandle(engine::kStreamColumnFamilyName), key_size);
+ return GetApproximateSizes(metadata, ns_key,
storage_->GetCFHandle(ColumnFamilyID::Stream), key_size);
}
} // namespace redis
diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc
index f60669eb..c1559b1a 100644
--- a/src/storage/batch_extractor.cc
+++ b/src/storage/batch_extractor.cc
@@ -44,14 +44,14 @@ void WriteBatchExtractor::LogData(const rocksdb::Slice
&blob) {
}
rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const
Slice &key, const Slice &value) {
- if (column_family_id == kColumnFamilyIDZSetScore) {
+ if (column_family_id ==
static_cast<uint32_t>(ColumnFamilyID::SecondarySubkey)) {
return rocksdb::Status::OK();
}
std::string ns, user_key;
std::vector<std::string> command_args;
- if (column_family_id == kColumnFamilyIDMetadata) {
+ if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::Metadata)) {
std::tie(ns, user_key) = ExtractNamespaceKey<std::string>(key,
is_slot_id_encoded_);
if (slot_id_ >= 0 && static_cast<uint16_t>(slot_id_) !=
GetSlotIdFromKey(user_key)) {
return rocksdb::Status::OK();
@@ -109,7 +109,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t
column_family_id, const Slic
return rocksdb::Status::OK();
}
- if (column_family_id == kColumnFamilyIDDefault) {
+ if (column_family_id ==
static_cast<uint32_t>(ColumnFamilyID::PrimarySubkey)) {
InternalKey ikey(key, is_slot_id_encoded_);
user_key = ikey.GetKey().ToString();
if (slot_id_ >= 0 && static_cast<uint16_t>(slot_id_) !=
GetSlotIdFromKey(user_key)) {
@@ -252,7 +252,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t
column_family_id, const Slic
default:
break;
}
- } else if (column_family_id == kColumnFamilyIDStream) {
+ } else if (column_family_id ==
static_cast<uint32_t>(ColumnFamilyID::Stream)) {
auto s = ExtractStreamAddCommand(is_slot_id_encoded_, key, value,
&command_args);
if (!s.IsOK()) {
LOG(ERROR) << "Failed to parse write_batch in PutCF. Type=Stream: " <<
s.Msg();
@@ -268,14 +268,14 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t
column_family_id, const Slic
}
rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const
Slice &key) {
- if (column_family_id == kColumnFamilyIDZSetScore) {
+ if (column_family_id ==
static_cast<uint32_t>(ColumnFamilyID::SecondarySubkey)) {
return rocksdb::Status::OK();
}
std::vector<std::string> command_args;
std::string ns;
- if (column_family_id == kColumnFamilyIDMetadata) {
+ if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::Metadata)) {
std::string user_key;
std::tie(ns, user_key) = ExtractNamespaceKey<std::string>(key,
is_slot_id_encoded_);
@@ -284,7 +284,7 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t
column_family_id, const S
}
command_args = {"DEL", user_key};
- } else if (column_family_id == kColumnFamilyIDDefault) {
+ } else if (column_family_id ==
static_cast<uint32_t>(ColumnFamilyID::PrimarySubkey)) {
InternalKey ikey(key, is_slot_id_encoded_);
std::string user_key = ikey.GetKey().ToString();
if (slot_id_ >= 0 && static_cast<uint16_t>(slot_id_) !=
GetSlotIdFromKey(user_key)) {
@@ -376,7 +376,7 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t
column_family_id, const S
default:
break;
}
- } else if (column_family_id == kColumnFamilyIDStream) {
+ } else if (column_family_id ==
static_cast<uint32_t>(ColumnFamilyID::Stream)) {
InternalKey ikey(key, is_slot_id_encoded_);
Slice encoded_id = ikey.GetSubKey();
redis::StreamEntryID entry_id;
diff --git a/src/storage/compaction_checker.cc
b/src/storage/compaction_checker.cc
index 55502b2a..649f1f23 100644
--- a/src/storage/compaction_checker.cc
+++ b/src/storage/compaction_checker.cc
@@ -29,18 +29,19 @@
void CompactionChecker::CompactPropagateAndPubSubFiles() {
rocksdb::CompactRangeOptions compact_opts;
compact_opts.change_level = true;
- std::vector<std::string> cf_names = {engine::kPubSubColumnFamilyName,
engine::kPropagateColumnFamilyName};
- for (const auto &cf_name : cf_names) {
- LOG(INFO) << "[compaction checker] Start the compact the column family: "
<< cf_name;
- auto cf_handle = storage_->GetCFHandle(cf_name);
+ for (const auto &cf :
+ {engine::ColumnFamilyConfigs::PubSubColumnFamily(),
engine::ColumnFamilyConfigs::PropagateColumnFamily()}) {
+ LOG(INFO) << "[compaction checker] Start the compact the column family: "
<< cf.Name();
+ auto cf_handle = storage_->GetCFHandle(cf.Id());
auto s = storage_->GetDB()->CompactRange(compact_opts, cf_handle, nullptr,
nullptr);
- LOG(INFO) << "[compaction checker] Compact the column family: " << cf_name
<< " finished, result: " << s.ToString();
+ LOG(INFO) << "[compaction checker] Compact the column family: " <<
cf.Name()
+ << " finished, result: " << s.ToString();
}
}
-void CompactionChecker::PickCompactionFiles(const std::string &cf_name) {
+void CompactionChecker::PickCompactionFilesForCf(const
engine::ColumnFamilyConfig &column_family_config) {
rocksdb::TablePropertiesCollection props;
- rocksdb::ColumnFamilyHandle *cf = storage_->GetCFHandle(cf_name);
+ rocksdb::ColumnFamilyHandle *cf =
storage_->GetCFHandle(column_family_config.Id());
auto s = storage_->GetDB()->GetPropertiesOfAllTables(cf, &props);
if (!s.ok()) {
LOG(WARNING) << "[compaction checker] Failed to get table properties, " <<
s.ToString();
diff --git a/src/storage/compaction_checker.h b/src/storage/compaction_checker.h
index 750c3229..f7c260b9 100644
--- a/src/storage/compaction_checker.h
+++ b/src/storage/compaction_checker.h
@@ -30,7 +30,7 @@ class CompactionChecker {
public:
explicit CompactionChecker(engine::Storage *storage) : storage_(storage) {}
~CompactionChecker() = default;
- void PickCompactionFiles(const std::string &cf_name);
+ void PickCompactionFilesForCf(const engine::ColumnFamilyConfig &cf_name);
void CompactPropagateAndPubSubFiles();
private:
diff --git a/src/storage/iterator.cc b/src/storage/iterator.cc
index 6514207b..c65dd3ab 100644
--- a/src/storage/iterator.cc
+++ b/src/storage/iterator.cc
@@ -26,8 +26,10 @@
namespace engine {
DBIterator::DBIterator(Storage *storage, rocksdb::ReadOptions read_options,
int slot)
- : storage_(storage), read_options_(std::move(read_options)), slot_(slot) {
- metadata_cf_handle_ = storage_->GetCFHandle(kMetadataColumnFamilyName);
+ : storage_(storage),
+ read_options_(std::move(read_options)),
+ slot_(slot),
+ metadata_cf_handle_(storage_->GetCFHandle(ColumnFamilyID::Metadata)) {
metadata_iter_ = util::UniqueIterator(storage_->NewIterator(read_options_,
metadata_cf_handle_));
}
@@ -115,9 +117,9 @@ std::unique_ptr<SubKeyIterator>
DBIterator::GetSubKeyIterator() const {
SubKeyIterator::SubKeyIterator(Storage *storage, rocksdb::ReadOptions
read_options, RedisType type, std::string prefix)
: storage_(storage), read_options_(std::move(read_options)), type_(type),
prefix_(std::move(prefix)) {
if (type_ == kRedisStream) {
- cf_handle_ = storage_->GetCFHandle(kStreamColumnFamilyName);
+ cf_handle_ = storage_->GetCFHandle(ColumnFamilyID::Stream);
} else {
- cf_handle_ = storage_->GetCFHandle(kSubkeyColumnFamilyName);
+ cf_handle_ = storage_->GetCFHandle(ColumnFamilyID::PrimarySubkey);
}
iter_ = util::UniqueIterator(storage_->NewIterator(read_options_,
cf_handle_));
}
diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc
index 13f9bd2f..3038a000 100644
--- a/src/storage/redis_db.cc
+++ b/src/storage/redis_db.cc
@@ -43,9 +43,10 @@
namespace redis {
-Database::Database(engine::Storage *storage, std::string ns) :
storage_(storage), namespace_(std::move(ns)) {
- metadata_cf_handle_ =
storage->GetCFHandle(engine::kMetadataColumnFamilyName);
-}
+Database::Database(engine::Storage *storage, std::string ns)
+ : storage_(storage),
+ metadata_cf_handle_(storage->GetCFHandle(ColumnFamilyID::Metadata)),
+ namespace_(std::move(ns)) {}
// Some data types may support reading multiple types of metadata.
// For example, bitmap supports reading string metadata and bitmap metadata.
@@ -748,7 +749,7 @@ rocksdb::Status Database::Copy(const std::string &key,
const std::string &new_ke
auto subkey_iter = iter.GetSubKeyIterator();
if (subkey_iter != nullptr) {
- auto zset_score_cf = type == kRedisZSet ?
storage_->GetCFHandle(engine::kZSetScoreColumnFamilyName) : nullptr;
+ auto zset_score_cf = type == kRedisZSet ?
storage_->GetCFHandle(ColumnFamilyID::SecondarySubkey) : nullptr;
for (subkey_iter->Seek(); subkey_iter->Valid(); subkey_iter->Next()) {
InternalKey from_ikey(subkey_iter->Key(), storage_->IsSlotIdEncoded());
diff --git a/src/storage/redis_pubsub.h b/src/storage/redis_pubsub.h
index 995560aa..d6e3d392 100644
--- a/src/storage/redis_pubsub.h
+++ b/src/storage/redis_pubsub.h
@@ -29,7 +29,8 @@ namespace redis {
class PubSub : public Database {
public:
- explicit PubSub(engine::Storage *storage) : Database(storage),
pubsub_cf_handle_(storage->GetCFHandle("pubsub")) {}
+ explicit PubSub(engine::Storage *storage)
+ : Database(storage),
pubsub_cf_handle_(storage->GetCFHandle(ColumnFamilyID::PubSub)) {}
rocksdb::Status Publish(const Slice &channel, const Slice &value);
private:
diff --git a/src/storage/scripting.cc b/src/storage/scripting.cc
index a16205a4..409d5e19 100644
--- a/src/storage/scripting.cc
+++ b/src/storage/scripting.cc
@@ -443,7 +443,7 @@ Status FunctionList(Server *srv, const redis::Connection
*conn, const std::strin
rocksdb::Slice upper_bound(end_key);
read_options.iterate_upper_bound = &upper_bound;
- auto *cf = srv->storage->GetCFHandle(engine::kPropagateColumnFamilyName);
+ auto *cf = srv->storage->GetCFHandle(ColumnFamilyID::Propagate);
auto iter = util::UniqueIterator(srv->storage, read_options, cf);
std::vector<std::pair<std::string, std::string>> result;
for (iter->Seek(start_key); iter->Valid(); iter->Next()) {
@@ -479,7 +479,7 @@ Status FunctionListFunc(Server *srv, const
redis::Connection *conn, const std::s
rocksdb::Slice upper_bound(end_key);
read_options.iterate_upper_bound = &upper_bound;
- auto *cf = srv->storage->GetCFHandle(engine::kPropagateColumnFamilyName);
+ auto *cf = srv->storage->GetCFHandle(ColumnFamilyID::Propagate);
auto iter = util::UniqueIterator(srv->storage, read_options, cf);
std::vector<std::pair<std::string, std::string>> result;
for (iter->Seek(start_key); iter->Valid(); iter->Next()) {
@@ -557,7 +557,7 @@ Status FunctionDelete(Server *srv, const std::string &name)
{
}
auto storage = srv->storage;
- auto cf = storage->GetCFHandle(engine::kPropagateColumnFamilyName);
+ auto cf = storage->GetCFHandle(ColumnFamilyID::Propagate);
for (size_t i = 1; i <= lua_objlen(lua, -1); ++i) {
lua_rawgeti(lua, -1, static_cast<int>(i));
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index b25ae8a8..a25eb9c7 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -236,11 +236,12 @@ Status Storage::CreateColumnFamilies(const
rocksdb::Options &options) {
rocksdb::ColumnFamilyOptions cf_options(options);
auto res = util::DBOpen(options, config_->db_dir);
if (res) {
- std::vector<std::string> cf_names = {kMetadataColumnFamilyName,
kZSetScoreColumnFamilyName,
- kPubSubColumnFamilyName,
kPropagateColumnFamilyName,
- kStreamColumnFamilyName,
kSearchColumnFamilyName};
+ std::vector<std::string> cf_names_except_default;
+ for (const auto &cf :
ColumnFamilyConfigs::ListColumnFamiliesWithoutDefault()) {
+ cf_names_except_default.emplace_back(cf.Name());
+ }
std::vector<rocksdb::ColumnFamilyHandle *> cf_handles;
- auto s = (*res)->CreateColumnFamilies(cf_options, cf_names, &cf_handles);
+ auto s = (*res)->CreateColumnFamilies(cf_options, cf_names_except_default,
&cf_handles);
if (!s.ok()) {
return {Status::DBOpenErr, s.ToString()};
}
@@ -307,7 +308,7 @@ Status Storage::Open(DBOpenMode mode) {
metadata_opts.memtable_whole_key_filtering = true;
metadata_opts.memtable_prefix_bloom_size_ratio = 0.1;
metadata_opts.table_properties_collector_factories.emplace_back(
- NewCompactOnExpiredTableCollectorFactory(kMetadataColumnFamilyName,
0.3));
+
NewCompactOnExpiredTableCollectorFactory(std::string(kMetadataColumnFamilyName),
0.3));
SetBlobDB(&metadata_opts);
rocksdb::BlockBasedTableOptions subkey_table_opts = InitTableOptions();
@@ -320,7 +321,7 @@ Status Storage::Open(DBOpenMode mode) {
subkey_opts.compaction_filter_factory =
std::make_shared<SubKeyFilterFactory>(this);
subkey_opts.disable_auto_compactions =
config_->rocks_db.disable_auto_compactions;
subkey_opts.table_properties_collector_factories.emplace_back(
- NewCompactOnExpiredTableCollectorFactory(kSubkeyColumnFamilyName, 0.3));
+
NewCompactOnExpiredTableCollectorFactory(std::string(kPrimarySubkeyColumnFamilyName),
0.3));
SetBlobDB(&subkey_opts);
rocksdb::BlockBasedTableOptions pubsub_table_opts = InitTableOptions();
@@ -340,12 +341,12 @@ Status Storage::Open(DBOpenMode mode) {
std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
// Caution: don't change the order of column family, or the handle will be
mismatched
column_families.emplace_back(rocksdb::kDefaultColumnFamilyName, subkey_opts);
- column_families.emplace_back(kMetadataColumnFamilyName, metadata_opts);
- column_families.emplace_back(kZSetScoreColumnFamilyName, subkey_opts);
- column_families.emplace_back(kPubSubColumnFamilyName, pubsub_opts);
- column_families.emplace_back(kPropagateColumnFamilyName, propagate_opts);
- column_families.emplace_back(kStreamColumnFamilyName, subkey_opts);
- column_families.emplace_back(kSearchColumnFamilyName, subkey_opts);
+ column_families.emplace_back(std::string(kMetadataColumnFamilyName),
metadata_opts);
+ column_families.emplace_back(std::string(kSecondarySubkeyColumnFamilyName),
subkey_opts);
+ column_families.emplace_back(std::string(kPubSubColumnFamilyName),
pubsub_opts);
+ column_families.emplace_back(std::string(kPropagateColumnFamilyName),
propagate_opts);
+ column_families.emplace_back(std::string(kStreamColumnFamilyName),
subkey_opts);
+ column_families.emplace_back(std::string(kSearchColumnFamilyName),
subkey_opts);
std::vector<std::string> old_column_families;
auto s = rocksdb::DB::ListColumnFamilies(options, config_->db_dir,
&old_column_families);
@@ -523,7 +524,7 @@ Status Storage::RestoreFromCheckpoint() {
bool Storage::IsEmptyDB() {
std::unique_ptr<rocksdb::Iterator> iter(
- db_->NewIterator(DefaultScanOptions(),
GetCFHandle(kMetadataColumnFamilyName)));
+ db_->NewIterator(DefaultScanOptions(),
GetCFHandle(ColumnFamilyID::Metadata)));
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
Metadata metadata(kRedisNone, false);
// If cannot decode the metadata we think the key is alive, so the db is
not empty
@@ -678,7 +679,7 @@ rocksdb::Status Storage::Delete(const rocksdb::WriteOptions
&options, rocksdb::C
rocksdb::Status Storage::DeleteRange(const std::string &first_key, const
std::string &last_key) {
auto batch = GetWriteBatchBase();
- rocksdb::ColumnFamilyHandle *cf_handle =
GetCFHandle(kMetadataColumnFamilyName);
+ rocksdb::ColumnFamilyHandle *cf_handle =
GetCFHandle(ColumnFamilyID::Metadata);
auto s = batch->DeleteRange(cf_handle, first_key, last_key);
if (!s.ok()) {
return s;
@@ -740,23 +741,6 @@ void Storage::RecordStat(StatType type, uint64_t v) {
}
}
-rocksdb::ColumnFamilyHandle *Storage::GetCFHandle(const std::string &name) {
- if (name == kMetadataColumnFamilyName) {
- return cf_handles_[1];
- } else if (name == kZSetScoreColumnFamilyName) {
- return cf_handles_[2];
- } else if (name == kPubSubColumnFamilyName) {
- return cf_handles_[3];
- } else if (name == kPropagateColumnFamilyName) {
- return cf_handles_[4];
- } else if (name == kStreamColumnFamilyName) {
- return cf_handles_[5];
- } else if (name == kSearchColumnFamilyName) {
- return cf_handles_[6];
- }
- return cf_handles_[0];
-}
-
rocksdb::ColumnFamilyHandle *Storage::GetCFHandle(ColumnFamilyID id) { return
cf_handles_[static_cast<size_t>(id)]; }
rocksdb::Status Storage::Compact(rocksdb::ColumnFamilyHandle *cf, const Slice
*begin, const Slice *end) {
@@ -787,7 +771,7 @@ uint64_t Storage::GetTotalSize(const std::string &ns) {
rocksdb::DB::SizeApproximationFlags::INCLUDE_FILES |
rocksdb::DB::SizeApproximationFlags::INCLUDE_MEMTABLES;
for (auto cf_handle : cf_handles_) {
- if (cf_handle == GetCFHandle(kPubSubColumnFamilyName) || cf_handle ==
GetCFHandle(kPropagateColumnFamilyName)) {
+ if (cf_handle == GetCFHandle(ColumnFamilyID::PubSub) || cf_handle ==
GetCFHandle(ColumnFamilyID::Propagate)) {
continue;
}
@@ -868,7 +852,7 @@ Status Storage::WriteToPropagateCF(const std::string &key,
const std::string &va
return {Status::NotOK, "cannot write to propagate column family in slave
mode"};
}
auto batch = GetWriteBatchBase();
- auto cf = GetCFHandle(kPropagateColumnFamilyName);
+ auto cf = GetCFHandle(ColumnFamilyID::Propagate);
batch->Put(cf, key, value);
auto s = Write(default_write_opts_, batch->GetWriteBatch());
if (!s.ok()) {
@@ -945,7 +929,7 @@ std::string
Storage::GetReplIdFromWalBySeq(rocksdb::SequenceNumber seq) {
std::string Storage::GetReplIdFromDbEngine() {
std::string replid_in_db;
- auto cf = GetCFHandle(kPropagateColumnFamilyName);
+ auto cf = GetCFHandle(ColumnFamilyID::Propagate);
auto s = db_->Get(rocksdb::ReadOptions(), cf, kReplicationIdKey,
&replid_in_db);
return replid_in_db;
}
diff --git a/src/storage/storage.h b/src/storage/storage.h
index 09151baa..579ccb72 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -51,31 +51,25 @@ inline constexpr StorageEngineType STORAGE_ENGINE_TYPE =
StorageEngineType::KVRO
const int kReplIdLength = 16;
-enum ColumnFamilyID {
- kColumnFamilyIDDefault = 0,
- kColumnFamilyIDMetadata,
- kColumnFamilyIDZSetScore,
- kColumnFamilyIDPubSub,
- kColumnFamilyIDPropagate,
- kColumnFamilyIDStream,
- kColumnFamilyIDSearch,
-};
-
enum DBOpenMode {
kDBOpenModeDefault,
kDBOpenModeForReadOnly,
kDBOpenModeAsSecondaryInstance,
};
-namespace engine {
+enum class ColumnFamilyID : uint32_t {
+ PrimarySubkey = 0,
+ Metadata,
+ SecondarySubkey,
+ PubSub,
+ Propagate,
+ Stream,
+ Search,
+};
-constexpr const char *kPubSubColumnFamilyName = "pubsub";
-constexpr const char *kZSetScoreColumnFamilyName = "zset_score";
-constexpr const char *kMetadataColumnFamilyName = "metadata";
-constexpr const char *kSubkeyColumnFamilyName = "default";
-constexpr const char *kPropagateColumnFamilyName = "propagate";
-constexpr const char *kStreamColumnFamilyName = "stream";
-constexpr const char *kSearchColumnFamilyName = "search";
+constexpr uint32_t kMaxColumnFamilyID =
static_cast<uint32_t>(ColumnFamilyID::Search);
+
+namespace engine {
constexpr const char *kPropagateScriptCommand = "script";
@@ -122,6 +116,84 @@ struct DBStats {
alignas(CACHE_LINE_SIZE) std::atomic<uint_fast64_t> keyspace_misses = 0;
};
+class ColumnFamilyConfig {
+ public:
+ ColumnFamilyConfig(ColumnFamilyID id, std::string_view name, bool is_minor)
+ : id_(id), name_(name), is_minor_(is_minor) {}
+ ColumnFamilyID Id() const { return id_; }
+ std::string_view Name() const { return name_; }
+ bool IsMinor() const { return is_minor_; }
+
+ private:
+ ColumnFamilyID id_;
+ std::string_view name_;
+ bool is_minor_;
+};
+
+constexpr const std::string_view kPrimarySubkeyColumnFamilyName = "default";
+constexpr const std::string_view kMetadataColumnFamilyName = "metadata";
+constexpr const std::string_view kSecondarySubkeyColumnFamilyName =
"zset_score";
+constexpr const std::string_view kPubSubColumnFamilyName = "pubsub";
+constexpr const std::string_view kPropagateColumnFamilyName = "propagate";
+constexpr const std::string_view kStreamColumnFamilyName = "stream";
+constexpr const std::string_view kSearchColumnFamilyName = "search";
+
+class ColumnFamilyConfigs {
+ public:
+ /// DefaultSubkeyColumnFamily is the default column family in rocksdb.
+ /// In kvrocks, we use it to store the data if metadata is not enough.
+ static ColumnFamilyConfig PrimarySubkeyColumnFamily() {
+ return {ColumnFamilyID::PrimarySubkey, kPrimarySubkeyColumnFamilyName,
/*is_minor=*/false};
+ }
+
+ /// MetadataColumnFamily stores the metadata of data-structures.
+ static ColumnFamilyConfig MetadataColumnFamily() {
+ return {ColumnFamilyID::Metadata, kMetadataColumnFamilyName,
/*is_minor=*/false};
+ }
+
+ /// SecondarySubkeyColumnFamily stores the score of zset or other secondary
subkey.
+ /// See https://kvrocks.apache.org/community/data-structure-on-rocksdb#zset
for more details.
+ static ColumnFamilyConfig SecondarySubkeyColumnFamily() {
+ return {ColumnFamilyID::SecondarySubkey, kSecondarySubkeyColumnFamilyName,
+ /*is_minor=*/true};
+ }
+
+ /// PubSubColumnFamily stores the pubsub data.
+ static ColumnFamilyConfig PubSubColumnFamily() {
+ return {ColumnFamilyID::PubSub, kPubSubColumnFamilyName,
/*is_minor=*/true};
+ }
+
+ static ColumnFamilyConfig PropagateColumnFamily() {
+ return {ColumnFamilyID::Propagate, kPropagateColumnFamilyName,
/*is_minor=*/true};
+ }
+
+ static ColumnFamilyConfig StreamColumnFamily() {
+ return {ColumnFamilyID::Stream, kStreamColumnFamilyName,
/*is_minor=*/true};
+ }
+
+ static ColumnFamilyConfig SearchColumnFamily() {
+ return {ColumnFamilyID::Search, kSearchColumnFamilyName,
/*is_minor=*/true};
+ }
+
+ /// ListAllColumnFamilies returns all column families in kvrocks.
+ static const std::vector<ColumnFamilyConfig> &ListAllColumnFamilies() {
return AllCfs; }
+
+ static const std::vector<ColumnFamilyConfig>
&ListColumnFamiliesWithoutDefault() { return AllCfsWithoutDefault; }
+
+ static const ColumnFamilyConfig &GetColumnFamily(ColumnFamilyID id) { return
AllCfs[static_cast<size_t>(id)]; }
+
+ private:
+ // Caution: don't change the order of column family, or the handle will be
mismatched
+ inline const static std::vector<ColumnFamilyConfig> AllCfs = {
+ PrimarySubkeyColumnFamily(), MetadataColumnFamily(),
SecondarySubkeyColumnFamily(), PubSubColumnFamily(),
+ PropagateColumnFamily(), StreamColumnFamily(),
SearchColumnFamily(),
+ };
+ inline const static std::vector<ColumnFamilyConfig> AllCfsWithoutDefault = {
+ MetadataColumnFamily(), SecondarySubkeyColumnFamily(),
PubSubColumnFamily(),
+ PropagateColumnFamily(), StreamColumnFamily(),
SearchColumnFamily(),
+ };
+};
+
class Storage {
public:
explicit Storage(Config *config);
@@ -180,7 +252,7 @@ class Storage {
rocksdb::DB *GetDB();
bool IsClosing() const { return db_closing_; }
std::string GetName() const { return config_->db_name; }
- rocksdb::ColumnFamilyHandle *GetCFHandle(const std::string &name);
+ /// Get the column family handle by the column family id.
rocksdb::ColumnFamilyHandle *GetCFHandle(ColumnFamilyID id);
std::vector<rocksdb::ColumnFamilyHandle *> *GetCFHandles() { return
&cf_handles_; }
LockManager *GetLockManager() { return &lock_mgr_; }
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index 2e867a85..e0546ded 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -36,7 +36,7 @@ namespace redis {
class Stream : public SubKeyScanner {
public:
explicit Stream(engine::Storage *storage, const std::string &ns)
- : SubKeyScanner(storage, ns),
stream_cf_handle_(storage->GetCFHandle("stream")) {}
+ : SubKeyScanner(storage, ns),
stream_cf_handle_(storage->GetCFHandle(ColumnFamilyID::Stream)) {}
rocksdb::Status Add(const Slice &stream_name, const StreamAddOptions
&options, const std::vector<std::string> &values,
StreamEntryID *id);
rocksdb::Status CreateGroup(const Slice &stream_name, const
StreamXGroupCreateOptions &options,
diff --git a/src/types/redis_string.cc b/src/types/redis_string.cc
index 0f0d4688..b934f3b8 100644
--- a/src/types/redis_string.cc
+++ b/src/types/redis_string.cc
@@ -503,8 +503,8 @@ rocksdb::Status String::CAD(const std::string &user_key,
const std::string &valu
}
if (value == current_value) {
- auto delete_status = storage_->Delete(storage_->DefaultWriteOptions(),
-
storage_->GetCFHandle(engine::kMetadataColumnFamilyName), ns_key);
+ auto delete_status =
+ storage_->Delete(storage_->DefaultWriteOptions(),
storage_->GetCFHandle(ColumnFamilyID::Metadata), ns_key);
if (!delete_status.ok()) {
return delete_status;
}
diff --git a/src/types/redis_zset.h b/src/types/redis_zset.h
index 0c52c5fa..a5ea754a 100644
--- a/src/types/redis_zset.h
+++ b/src/types/redis_zset.h
@@ -90,7 +90,7 @@ namespace redis {
class ZSet : public SubKeyScanner {
public:
explicit ZSet(engine::Storage *storage, const std::string &ns)
- : SubKeyScanner(storage, ns),
score_cf_handle_(storage->GetCFHandle(engine::kZSetScoreColumnFamilyName)) {}
+ : SubKeyScanner(storage, ns),
score_cf_handle_(storage->GetCFHandle(ColumnFamilyID::SecondarySubkey)) {}
using Members = std::vector<std::string>;
using MemberScores = std::vector<MemberScore>;
diff --git a/tests/cppunit/compact_test.cc b/tests/cppunit/compact_test.cc
index c5616304..fad4cb13 100644
--- a/tests/cppunit/compact_test.cc
+++ b/tests/cppunit/compact_test.cc
@@ -60,17 +60,17 @@ TEST(Compact, Filter) {
read_options.snapshot = db->GetSnapshot();
read_options.fill_cache = false;
- auto new_iterator = [db, read_options, &storage](const std::string& name) {
- return std::unique_ptr<rocksdb::Iterator>(db->NewIterator(read_options,
storage->GetCFHandle(name)));
+ auto new_iterator = [db, read_options, &storage](ColumnFamilyID
column_family_id) {
+ return std::unique_ptr<rocksdb::Iterator>(db->NewIterator(read_options,
storage->GetCFHandle(column_family_id)));
};
- auto iter = new_iterator("metadata");
+ auto iter = new_iterator(ColumnFamilyID::Metadata);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
auto [user_ns, user_key] = ExtractNamespaceKey(iter->key(),
storage->IsSlotIdEncoded());
EXPECT_EQ(user_key.ToString(), live_hash_key);
}
- iter = new_iterator("subkey");
+ iter = new_iterator(ColumnFamilyID::PrimarySubkey);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
InternalKey ikey(iter->key(), storage->IsSlotIdEncoded());
EXPECT_EQ(ikey.GetKey().ToString(), live_hash_key);
@@ -85,17 +85,17 @@ TEST(Compact, Filter) {
// Same as the above compact, need to compact twice here
status = storage->Compact(nullptr, nullptr, nullptr);
- assert(status.ok());
+ EXPECT_TRUE(status.ok());
status = storage->Compact(nullptr, nullptr, nullptr);
- assert(status.ok());
+ EXPECT_TRUE(status.ok());
- iter = new_iterator("default");
+ iter = new_iterator(ColumnFamilyID::PrimarySubkey);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
InternalKey ikey(iter->key(), storage->IsSlotIdEncoded());
EXPECT_EQ(ikey.GetKey().ToString(), live_hash_key);
}
- iter = new_iterator("zset_score");
+ iter = new_iterator(ColumnFamilyID::SecondarySubkey);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
EXPECT_TRUE(false); // never reach here
}
@@ -107,7 +107,7 @@ TEST(Compact, Filter) {
int retry = 2;
while (retry-- > 0) {
status = storage->Compact(nullptr, nullptr, nullptr);
- assert(status.ok());
+ ASSERT_TRUE(status.ok());
std::vector<FieldValue> fieldvalues;
auto get_res = hash->GetAll(mk_with_ttl, &fieldvalues);
auto s_expire = hash->Expire(mk_with_ttl, 1); // expired immediately..
diff --git a/tests/cppunit/indexer_test.cc b/tests/cppunit/indexer_test.cc
index ced039e0..4ab41b3a 100644
--- a/tests/cppunit/indexer_test.cc
+++ b/tests/cppunit/indexer_test.cc
@@ -67,7 +67,7 @@ struct IndexerTest : TestBase {
TEST_F(IndexerTest, HashTag) {
redis::Hash db(storage_.get(), ns);
- auto cfhandler = storage_->GetCFHandle("search");
+ auto cfhandler = storage_->GetCFHandle(ColumnFamilyID::Search);
{
auto s = indexer.Record("no_exist", ns);
@@ -174,7 +174,7 @@ TEST_F(IndexerTest, HashTag) {
TEST_F(IndexerTest, JsonTag) {
redis::Json db(storage_.get(), ns);
- auto cfhandler = storage_->GetCFHandle("search");
+ auto cfhandler = storage_->GetCFHandle(ColumnFamilyID::Search);
{
auto s = indexer.Record("no_exist", ns);
diff --git a/tests/cppunit/iterator_test.cc b/tests/cppunit/iterator_test.cc
index 71ebec45..08705d77 100644
--- a/tests/cppunit/iterator_test.cc
+++ b/tests/cppunit/iterator_test.cc
@@ -451,10 +451,10 @@ TEST_F(WALIteratorTest, BasicHash) {
auto item = iter.Item();
switch (item.type) {
case engine::WALItem::Type::kTypePut: {
- if (item.column_family_id == kColumnFamilyIDDefault) {
+ if (item.column_family_id ==
static_cast<uint32_t>(ColumnFamilyID::PrimarySubkey)) {
InternalKey internal_key(item.key, storage_->IsSlotIdEncoded());
put_fields.emplace_back(internal_key.GetSubKey().ToString());
- } else if (item.column_family_id == kColumnFamilyIDMetadata) {
+ } else if (item.column_family_id ==
static_cast<uint32_t>(ColumnFamilyID::Metadata)) {
auto [ns, key] = ExtractNamespaceKey(item.key,
storage_->IsSlotIdEncoded());
ASSERT_EQ(ns.ToString(), "test_ns1");
put_keys.emplace_back(key.ToString());
@@ -504,10 +504,10 @@ TEST_F(WALIteratorTest, BasicSet) {
auto item = iter.Item();
switch (item.type) {
case engine::WALItem::Type::kTypePut: {
- if (item.column_family_id == kColumnFamilyIDDefault) {
+ if (item.column_family_id ==
static_cast<uint32_t>(ColumnFamilyID::PrimarySubkey)) {
InternalKey internal_key(item.key, storage_->IsSlotIdEncoded());
put_members.emplace_back(internal_key.GetSubKey().ToString());
- } else if (item.column_family_id == kColumnFamilyIDMetadata) {
+ } else if (item.column_family_id ==
static_cast<uint32_t>(ColumnFamilyID::Metadata)) {
auto [ns, key] = ExtractNamespaceKey(item.key,
storage_->IsSlotIdEncoded());
ASSERT_EQ(ns.ToString(), "test_ns2");
put_keys.emplace_back(key.ToString());
@@ -559,10 +559,10 @@ TEST_F(WALIteratorTest, BasicZSet) {
auto item = iter.Item();
switch (item.type) {
case engine::WALItem::Type::kTypePut: {
- if (item.column_family_id == kColumnFamilyIDDefault) {
+ if (item.column_family_id ==
static_cast<uint32_t>(ColumnFamilyID::PrimarySubkey)) {
InternalKey internal_key(item.key, storage_->IsSlotIdEncoded());
put_members.emplace_back(internal_key.GetSubKey().ToString());
- } else if (item.column_family_id == kColumnFamilyIDMetadata) {
+ } else if (item.column_family_id ==
static_cast<uint32_t>(ColumnFamilyID::Metadata)) {
auto [ns, key] = ExtractNamespaceKey(item.key,
storage_->IsSlotIdEncoded());
ASSERT_EQ(ns.ToString(), "test_ns3");
put_keys.emplace_back(key.ToString());
@@ -609,9 +609,9 @@ TEST_F(WALIteratorTest, BasicList) {
auto item = iter.Item();
switch (item.type) {
case engine::WALItem::Type::kTypePut: {
- if (item.column_family_id == kColumnFamilyIDDefault) {
+ if (item.column_family_id ==
static_cast<uint32_t>(ColumnFamilyID::PrimarySubkey)) {
put_values.emplace_back(item.value);
- } else if (item.column_family_id == kColumnFamilyIDMetadata) {
+ } else if (item.column_family_id ==
static_cast<uint32_t>(ColumnFamilyID::Metadata)) {
auto [ns, key] = ExtractNamespaceKey(item.key,
storage_->IsSlotIdEncoded());
ASSERT_EQ(ns.ToString(), "test_ns4");
put_keys.emplace_back(key.ToString());
@@ -663,12 +663,12 @@ TEST_F(WALIteratorTest, BasicStream) {
auto item = iter.Item();
switch (item.type) {
case engine::WALItem::Type::kTypePut: {
- if (item.column_family_id == kColumnFamilyIDStream) {
+ if (item.column_family_id ==
static_cast<uint32_t>(ColumnFamilyID::Stream)) {
std::vector<std::string> elems;
auto s = redis::DecodeRawStreamEntryValue(item.value, &elems);
ASSERT_TRUE(s.IsOK() && !elems.empty());
put_values.emplace_back(elems[0]);
- } else if (item.column_family_id == kColumnFamilyIDMetadata) {
+ } else if (item.column_family_id ==
static_cast<uint32_t>(ColumnFamilyID::Metadata)) {
auto [ns, key] = ExtractNamespaceKey(item.key,
storage_->IsSlotIdEncoded());
ASSERT_EQ(ns.ToString(), "test_ns5");
put_keys.emplace_back(key.ToString());
@@ -715,7 +715,7 @@ TEST_F(WALIteratorTest, BasicBitmap) {
auto item = iter.Item();
switch (item.type) {
case engine::WALItem::Type::kTypePut: {
- if (item.column_family_id == kColumnFamilyIDDefault) {
+ if (item.column_family_id ==
static_cast<uint32_t>(ColumnFamilyID::PrimarySubkey)) {
put_values.emplace_back(item.value);
}
break;
@@ -754,7 +754,7 @@ TEST_F(WALIteratorTest, BasicJSON) {
auto item = iter.Item();
switch (item.type) {
case engine::WALItem::Type::kTypePut: {
- ASSERT_EQ(item.column_family_id, kColumnFamilyIDMetadata);
+ ASSERT_EQ(item.column_family_id,
static_cast<uint32_t>(ColumnFamilyID::Metadata));
auto [ns, key] = ExtractNamespaceKey(item.key,
storage_->IsSlotIdEncoded());
ASSERT_EQ(ns.ToString(), "test_ns7");
put_keys.emplace_back(key.ToString());
@@ -767,7 +767,7 @@ TEST_F(WALIteratorTest, BasicJSON) {
break;
}
case engine::WALItem::Type::kTypeDelete: {
- ASSERT_EQ(item.column_family_id, kColumnFamilyIDMetadata);
+ ASSERT_EQ(item.column_family_id,
static_cast<uint32_t>(ColumnFamilyID::Metadata));
auto [ns, key] = ExtractNamespaceKey(item.key,
storage_->IsSlotIdEncoded());
ASSERT_EQ(ns.ToString(), "test_ns7");
delete_keys.emplace_back(key.ToString());
@@ -801,7 +801,7 @@ TEST_F(WALIteratorTest, BasicSortedInt) {
auto item = iter.Item();
switch (item.type) {
case engine::WALItem::Type::kTypePut: {
- if (item.column_family_id == kColumnFamilyIDDefault) {
+ if (item.column_family_id ==
static_cast<uint32_t>(ColumnFamilyID::PrimarySubkey)) {
const InternalKey internal_key(item.key,
storage_->IsSlotIdEncoded());
auto value = DecodeFixed64(internal_key.GetSubKey().data());
put_values.emplace_back(value);
diff --git a/utils/kvrocks2redis/parser.cc b/utils/kvrocks2redis/parser.cc
index 9d4db5ec..c131f146 100644
--- a/utils/kvrocks2redis/parser.cc
+++ b/utils/kvrocks2redis/parser.cc
@@ -33,7 +33,7 @@
Status Parser::ParseFullDB() {
rocksdb::DB *db = storage_->GetDB();
- rocksdb::ColumnFamilyHandle *metadata_cf_handle =
storage_->GetCFHandle(engine::kMetadataColumnFamilyName);
+ rocksdb::ColumnFamilyHandle *metadata_cf_handle =
storage_->GetCFHandle(ColumnFamilyID::Metadata);
// Due to RSI(Rocksdb Secondary Instance) not supporting "Snapshots based
read", we don't need to set the snapshot
// parameter. However, until we proactively invoke TryCatchUpWithPrimary,
this replica is read-only, which can be
// considered as a snapshot.