This is an automated email from the ASF dual-hosted git repository.
maplefu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 9879fa32 Add snapshot option for Geting Metadata (#2174)
9879fa32 is described below
commit 9879fa325531ecabf5a275064b15530b6cea3f2d
Author: mwish <[email protected]>
AuthorDate: Tue Mar 19 15:27:00 2024 +0800
Add snapshot option for Geting Metadata (#2174)
Co-authored-by: hulk <[email protected]>
---
src/search/indexer.cc | 2 +-
src/stats/disk_stats.cc | 14 ++++++-------
src/storage/redis_db.cc | 32 ++++++++++++++---------------
src/storage/redis_db.h | 46 +++++++++++++++++++++++++++++++++++++-----
src/storage/redis_metadata.h | 2 +-
src/types/redis_bitmap.cc | 36 +++++++++++++++++++--------------
src/types/redis_bitmap.h | 3 ++-
src/types/redis_bloom_chain.cc | 13 ++++++------
src/types/redis_bloom_chain.h | 3 ++-
src/types/redis_geo.cc | 2 +-
src/types/redis_hash.cc | 32 ++++++++++++++---------------
src/types/redis_hash.h | 2 +-
src/types/redis_json.cc | 4 ++--
src/types/redis_list.cc | 34 +++++++++++++++----------------
src/types/redis_list.h | 2 +-
src/types/redis_set.cc | 22 +++++++++++---------
src/types/redis_set.h | 2 +-
src/types/redis_sortedint.cc | 23 +++++++++++----------
src/types/redis_sortedint.h | 2 +-
src/types/redis_stream.cc | 39 ++++++++++++++++++-----------------
src/types/redis_stream.h | 2 +-
src/types/redis_string.cc | 2 +-
src/types/redis_zset.cc | 38 +++++++++++++++++-----------------
src/types/redis_zset.h | 2 +-
tests/cppunit/metadata_test.cc | 2 +-
25 files changed, 204 insertions(+), 157 deletions(-)
diff --git a/src/search/indexer.cc b/src/search/indexer.cc
index c082133e..bb4e88c1 100644
--- a/src/search/indexer.cc
+++ b/src/search/indexer.cc
@@ -38,7 +38,7 @@ StatusOr<FieldValueRetriever>
FieldValueRetriever::Create(SearchOnDataType type,
Hash db(storage, ns);
std::string ns_key = db.AppendNamespacePrefix(key);
HashMetadata metadata(false);
- auto s = db.GetMetadata(ns_key, &metadata);
+ auto s = db.GetMetadata(Database::GetOptions{}, ns_key, &metadata);
if (!s.ok()) return {Status::NotOK, s.ToString()};
return FieldValueRetriever(db, metadata, key);
} else if (type == SearchOnDataType::JSON) {
diff --git a/src/stats/disk_stats.cc b/src/stats/disk_stats.cc
index 7c3c9998..9eda0950 100644
--- a/src/stats/disk_stats.cc
+++ b/src/stats/disk_stats.cc
@@ -77,21 +77,21 @@ rocksdb::Status Disk::GetStringSize(const Slice &ns_key,
uint64_t *key_size) {
rocksdb::Status Disk::GetHashSize(const Slice &ns_key, uint64_t *key_size) {
HashMetadata metadata(false);
- rocksdb::Status s = Database::GetMetadata({kRedisHash}, ns_key, &metadata);
+ rocksdb::Status s = Database::GetMetadata(Database::GetOptions{},
{kRedisHash}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
return GetApproximateSizes(metadata, ns_key,
storage_->GetCFHandle(engine::kSubkeyColumnFamilyName), key_size);
}
rocksdb::Status Disk::GetSetSize(const Slice &ns_key, uint64_t *key_size) {
SetMetadata metadata(false);
- rocksdb::Status s = Database::GetMetadata({kRedisSet}, ns_key, &metadata);
+ rocksdb::Status s = Database::GetMetadata(Database::GetOptions{},
{kRedisSet}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
return GetApproximateSizes(metadata, ns_key,
storage_->GetCFHandle(engine::kSubkeyColumnFamilyName), key_size);
}
rocksdb::Status Disk::GetListSize(const Slice &ns_key, uint64_t *key_size) {
ListMetadata metadata(false);
- rocksdb::Status s = Database::GetMetadata({kRedisList}, ns_key, &metadata);
+ rocksdb::Status s = Database::GetMetadata(Database::GetOptions{},
{kRedisList}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string buf;
PutFixed64(&buf, metadata.head);
@@ -100,7 +100,7 @@ rocksdb::Status Disk::GetListSize(const Slice &ns_key,
uint64_t *key_size) {
rocksdb::Status Disk::GetZsetSize(const Slice &ns_key, uint64_t *key_size) {
ZSetMetadata metadata(false);
- rocksdb::Status s = Database::GetMetadata({kRedisZSet}, ns_key, &metadata);
+ rocksdb::Status s = Database::GetMetadata(Database::GetOptions{},
{kRedisZSet}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string score_bytes;
PutDouble(&score_bytes, kMinScore);
@@ -112,7 +112,7 @@ rocksdb::Status Disk::GetZsetSize(const Slice &ns_key,
uint64_t *key_size) {
rocksdb::Status Disk::GetBitmapSize(const Slice &ns_key, uint64_t *key_size) {
BitmapMetadata metadata(false);
- rocksdb::Status s = Database::GetMetadata({kRedisBitmap}, ns_key, &metadata);
+ rocksdb::Status s = Database::GetMetadata(Database::GetOptions{},
{kRedisBitmap}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
return GetApproximateSizes(metadata, ns_key,
storage_->GetCFHandle(engine::kSubkeyColumnFamilyName), key_size,
std::to_string(0), std::to_string(0));
@@ -120,7 +120,7 @@ rocksdb::Status Disk::GetBitmapSize(const Slice &ns_key,
uint64_t *key_size) {
rocksdb::Status Disk::GetSortedintSize(const Slice &ns_key, uint64_t
*key_size) {
SortedintMetadata metadata(false);
- rocksdb::Status s = Database::GetMetadata({kRedisSortedint}, ns_key,
&metadata);
+ rocksdb::Status s = Database::GetMetadata(Database::GetOptions{},
{kRedisSortedint}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string start_buf;
PutFixed64(&start_buf, 0);
@@ -130,7 +130,7 @@ rocksdb::Status Disk::GetSortedintSize(const Slice &ns_key,
uint64_t *key_size)
rocksdb::Status Disk::GetStreamSize(const Slice &ns_key, uint64_t *key_size) {
StreamMetadata metadata(false);
- rocksdb::Status s = Database::GetMetadata({kRedisStream}, ns_key, &metadata);
+ rocksdb::Status s = Database::GetMetadata(Database::GetOptions{},
{kRedisStream}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
return GetApproximateSizes(metadata, ns_key,
storage_->GetCFHandle(engine::kStreamColumnFamilyName), key_size);
}
diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc
index 63e8ef57..e75a19b0 100644
--- a/src/storage/redis_db.cc
+++ b/src/storage/redis_db.cc
@@ -84,25 +84,25 @@ rocksdb::Status Database::ParseMetadata(RedisTypes types,
Slice *bytes, Metadata
return s;
}
-rocksdb::Status Database::GetMetadata(RedisTypes types, const Slice &ns_key,
Metadata *metadata) {
+rocksdb::Status Database::GetMetadata(GetOptions options, RedisTypes types,
const Slice &ns_key, Metadata *metadata) {
std::string raw_value;
Slice rest;
- return GetMetadata(types, ns_key, &raw_value, metadata, &rest);
+ return GetMetadata(options, types, ns_key, &raw_value, metadata, &rest);
}
-rocksdb::Status Database::GetMetadata(RedisTypes types, const Slice &ns_key,
std::string *raw_value, Metadata *metadata,
- Slice *rest) {
- auto s = GetRawMetadata(ns_key, raw_value);
+rocksdb::Status Database::GetMetadata(GetOptions options, RedisTypes types,
const Slice &ns_key, std::string *raw_value,
+ Metadata *metadata, Slice *rest) {
+ auto s = GetRawMetadata(options, ns_key, raw_value);
*rest = *raw_value;
if (!s.ok()) return s;
return ParseMetadata(types, rest, metadata);
}
-rocksdb::Status Database::GetRawMetadata(const Slice &ns_key, std::string
*bytes) {
- LatestSnapShot ss(storage_);
- rocksdb::ReadOptions read_options;
- read_options.snapshot = ss.GetSnapShot();
- return storage_->Get(read_options, metadata_cf_handle_, ns_key, bytes);
+rocksdb::Status Database::GetRawMetadata(GetOptions options, const Slice
&ns_key, std::string *bytes) {
+ rocksdb::ReadOptions opts;
+ // If options.snapshot == nullptr, we can avoid allocating a snapshot here.
+ opts.snapshot = options.snapshot;
+ return storage_->Get(opts, metadata_cf_handle_, ns_key, bytes);
}
rocksdb::Status Database::Expire(const Slice &user_key, uint64_t timestamp) {
@@ -247,7 +247,7 @@ rocksdb::Status Database::TTL(const Slice &user_key,
int64_t *ttl) {
rocksdb::Status Database::GetExpireTime(const Slice &user_key, uint64_t
*timestamp) {
std::string ns_key = AppendNamespacePrefix(user_key);
Metadata metadata(kRedisNone, false);
- auto s = GetMetadata(RedisTypes::All(), ns_key, &metadata);
+ auto s = GetMetadata(GetOptions{}, RedisTypes::All(), ns_key, &metadata);
if (!s.ok()) return s;
*timestamp = metadata.expire;
@@ -517,7 +517,7 @@ rocksdb::Status Database::Dump(const Slice &user_key,
std::vector<std::string> *
if (metadata.Type() == kRedisList) {
ListMetadata list_metadata(false);
- s = GetMetadata({kRedisList}, ns_key, &list_metadata);
+ s = GetMetadata(GetOptions{}, {kRedisList}, ns_key, &list_metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
infos->emplace_back("head");
infos->emplace_back(std::to_string(list_metadata.head));
@@ -618,8 +618,7 @@ rocksdb::Status Database::ClearKeysOfSlot(const
rocksdb::Slice &ns, int slot) {
rocksdb::Status Database::KeyExist(const std::string &key) {
int cnt = 0;
- std::vector<rocksdb::Slice> keys;
- keys.emplace_back(key);
+ std::vector<rocksdb::Slice> keys{key};
auto s = Exists(keys, &cnt);
if (!s.ok()) {
return s;
@@ -636,12 +635,11 @@ rocksdb::Status SubKeyScanner::Scan(RedisType type, const
Slice &user_key, const
uint64_t cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
Metadata metadata(type, false);
- rocksdb::Status s = GetMetadata({type}, ns_key, &metadata);
+ LatestSnapShot ss(storage_);
+ rocksdb::Status s = GetMetadata(GetOptions{.snapshot = ss.GetSnapShot()},
{type}, ns_key, &metadata);
if (!s.ok()) return s;
- LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
- read_options.snapshot = ss.GetSnapShot();
auto iter = util::UniqueIterator(storage_, read_options);
std::string match_prefix_key =
InternalKey(ns_key, subkey_prefix, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
diff --git a/src/storage/redis_db.h b/src/storage/redis_db.h
index 0627b651..493b83c3 100644
--- a/src/storage/redis_db.h
+++ b/src/storage/redis_db.h
@@ -29,16 +29,53 @@
#include "storage.h"
namespace redis {
+
+/// Database is a wrapper of underlying storage engine, it provides
+/// some common operations for redis commands.
class Database {
public:
static constexpr uint64_t RANDOM_KEY_SCAN_LIMIT = 60;
+ struct GetOptions {
+ // If snapshot is not nullptr, read from the specified snapshot,
+ // otherwise read from the "latest" snapshot.
+ const rocksdb::Snapshot *snapshot = nullptr;
+ };
+
explicit Database(engine::Storage *storage, std::string ns = "");
+ /// Parsing metadata with type of `types` from bytes, the metadata is a base
class of all metadata.
+ /// When parsing, the bytes will be consumed.
[[nodiscard]] rocksdb::Status ParseMetadata(RedisTypes types, Slice *bytes,
Metadata *metadata);
- [[nodiscard]] rocksdb::Status GetMetadata(RedisTypes types, const Slice
&ns_key, Metadata *metadata);
- [[nodiscard]] rocksdb::Status GetMetadata(RedisTypes types, const Slice
&ns_key, std::string *raw_value,
- Metadata *metadata, Slice *rest);
- [[nodiscard]] rocksdb::Status GetRawMetadata(const Slice &ns_key,
std::string *bytes);
+ /// GetMetadata is a helper function to get metadata from the database. It
will read the "raw metadata"
+ /// from underlying storage, and then parse the raw metadata to the
specified metadata type.
+ ///
+ /// \param options The read options, including whether uses a snapshot
during reading the metadata.
+ /// \param types The candidate types of the metadata.
+ /// \param ns_key The key with namespace of the metadata.
+ /// \param metadata The output metadata.
+ [[nodiscard]] rocksdb::Status GetMetadata(GetOptions options, RedisTypes
types, const Slice &ns_key,
+ Metadata *metadata);
+ /// GetMetadata is a helper function to get metadata from the database. It
will read the "raw metadata"
+ /// from underlying storage, and then parse the raw metadata to the
specified metadata type.
+ ///
+ /// Compared with the above function, this function will also return the
rest of the bytes
+ /// after parsing the metadata.
+ ///
+ /// \param options The read options, including whether uses a snapshot
during reading the metadata.
+ /// \param types The candidate types of the metadata.
+ /// \param ns_key The key with namespace of the metadata.
+ /// \param raw_value Holding the raw metadata.
+ /// \param metadata The output metadata.
+ /// \param rest The rest of the bytes after parsing the metadata.
+ [[nodiscard]] rocksdb::Status GetMetadata(GetOptions options, RedisTypes
types, const Slice &ns_key,
+ std::string *raw_value, Metadata
*metadata, Slice *rest);
+ /// GetRawMetadata is a helper function to get the "raw metadata" from the
database without parsing
+ /// it to the specified metadata type.
+ ///
+ /// \param options The read options, including whether uses a snapshot
during reading the metadata.
+ /// \param ns_key The key with namespace of the metadata.
+ /// \param bytes The output raw metadata.
+ [[nodiscard]] rocksdb::Status GetRawMetadata(GetOptions options, const Slice
&ns_key, std::string *bytes);
[[nodiscard]] rocksdb::Status Expire(const Slice &user_key, uint64_t
timestamp);
[[nodiscard]] rocksdb::Status Del(const Slice &user_key);
[[nodiscard]] rocksdb::Status MDel(const std::vector<Slice> &keys, uint64_t
*deleted_cnt);
@@ -70,7 +107,6 @@ class Database {
friend class LatestSnapShot;
};
-
class LatestSnapShot {
public:
explicit LatestSnapShot(engine::Storage *storage) : storage_(storage),
snapshot_(storage_->GetDB()->GetSnapshot()) {}
diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h
index dac2d0e1..531a8803 100644
--- a/src/storage/redis_metadata.h
+++ b/src/storage/redis_metadata.h
@@ -65,7 +65,7 @@ struct RedisTypes {
return RedisTypes(types);
}
- bool Contains(RedisType type) { return types_[type]; }
+ bool Contains(RedisType type) const { return types_[type]; }
private:
using UnderlyingType = std::bitset<128>;
diff --git a/src/types/redis_bitmap.cc b/src/types/redis_bitmap.cc
index ac3d8768..555fb139 100644
--- a/src/types/redis_bitmap.cc
+++ b/src/types/redis_bitmap.cc
@@ -93,8 +93,9 @@ uint32_t SegmentSubKeyIndexForBit(uint32_t bit_offset) {
return (bit_offset / kBitmapSegmentBits) * kBitmapSegmentBytes;
}
-rocksdb::Status Bitmap::GetMetadata(const Slice &ns_key, BitmapMetadata
*metadata, std::string *raw_value) {
- auto s = GetRawMetadata(ns_key, raw_value);
+rocksdb::Status Bitmap::GetMetadata(Database::GetOptions get_options, const
Slice &ns_key, BitmapMetadata *metadata,
+ std::string *raw_value) {
+ auto s = GetRawMetadata(get_options, ns_key, raw_value);
if (!s.ok()) return s;
Slice slice = *raw_value;
@@ -107,7 +108,8 @@ rocksdb::Status Bitmap::GetBit(const Slice &user_key,
uint32_t bit_offset, bool
std::string ns_key = AppendNamespacePrefix(user_key);
BitmapMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata, &raw_value);
+ LatestSnapShot ss(storage_);
+ rocksdb::Status s = GetMetadata(GetOptions{.snapshot = ss.GetSnapShot()},
ns_key, &metadata, &raw_value);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
if (metadata.Type() == kRedisString) {
@@ -115,7 +117,6 @@ rocksdb::Status Bitmap::GetBit(const Slice &user_key,
uint32_t bit_offset, bool
return bitmap_string_db.GetBit(raw_value, bit_offset, bit);
}
- LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
rocksdb::PinnableSlice value;
@@ -142,7 +143,8 @@ rocksdb::Status Bitmap::GetString(const Slice &user_key,
const uint32_t max_btos
std::string ns_key = AppendNamespacePrefix(user_key);
BitmapMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata, &raw_value);
+ LatestSnapShot ss(storage_);
+ rocksdb::Status s = GetMetadata(GetOptions{.snapshot = ss.GetSnapShot()},
ns_key, &metadata, &raw_value);
if (!s.ok()) return s;
if (metadata.size > max_btos_size) {
return rocksdb::Status::Aborted(kErrBitmapStringOutOfRange);
@@ -152,7 +154,6 @@ rocksdb::Status Bitmap::GetString(const Slice &user_key,
const uint32_t max_btos
std::string prefix_key = InternalKey(ns_key, "", metadata.version,
storage_->IsSlotIdEncoded()).Encode();
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
- LatestSnapShot ss(storage_);
read_options.snapshot = ss.GetSnapShot();
auto iter = util::UniqueIterator(storage_, read_options);
@@ -184,7 +185,7 @@ rocksdb::Status Bitmap::SetBit(const Slice &user_key,
uint32_t bit_offset, bool
LockGuard guard(storage_->GetLockManager(), ns_key);
BitmapMetadata metadata;
- rocksdb::Status s = GetMetadata(ns_key, &metadata, &raw_value);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata, &raw_value);
if (!s.ok() && !s.IsNotFound()) return s;
if (metadata.Type() == kRedisString) {
@@ -228,7 +229,8 @@ rocksdb::Status Bitmap::BitCount(const Slice &user_key,
int64_t start, int64_t s
std::string ns_key = AppendNamespacePrefix(user_key);
BitmapMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata, &raw_value);
+ std::optional<LatestSnapShot> ss(storage_);
+ rocksdb::Status s = GetMetadata(GetOptions{.snapshot = ss->GetSnapShot()},
ns_key, &metadata, &raw_value);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
/* Convert negative indexes */
@@ -237,6 +239,9 @@ rocksdb::Status Bitmap::BitCount(const Slice &user_key,
int64_t start, int64_t s
}
if (metadata.Type() == kRedisString) {
+ // Release snapshot ahead for performance, this requires
+ // `bitmap_string_db` doesn't get anything.
+ ss = std::nullopt;
redis::BitmapString bitmap_string_db(storage_, namespace_);
return bitmap_string_db.BitCount(raw_value, start, stop, is_bit_index,
cnt);
}
@@ -257,9 +262,8 @@ rocksdb::Status Bitmap::BitCount(const Slice &user_key,
int64_t start, int64_t s
auto u_start = static_cast<uint32_t>(start_byte);
auto u_stop = static_cast<uint32_t>(stop_byte);
- LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
- read_options.snapshot = ss.GetSnapShot();
+ read_options.snapshot = ss->GetSnapShot();
uint32_t start_index = u_start / kBitmapSegmentBytes;
uint32_t stop_index = u_stop / kBitmapSegmentBytes;
// Don't use multi get to prevent large range query, and take too much memory
@@ -308,7 +312,8 @@ rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool
bit, int64_t start, i
std::string ns_key = AppendNamespacePrefix(user_key);
BitmapMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata, &raw_value);
+ std::optional<LatestSnapShot> ss(storage_);
+ rocksdb::Status s = GetMetadata(GetOptions{.snapshot = ss->GetSnapShot()},
ns_key, &metadata, &raw_value);
if (!s.ok() && !s.IsNotFound()) return s;
if (s.IsNotFound()) {
*pos = bit ? -1 : 0;
@@ -316,6 +321,7 @@ rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool
bit, int64_t start, i
}
if (metadata.Type() == kRedisString) {
+ ss = std::nullopt;
redis::BitmapString bitmap_string_db(storage_, namespace_);
return bitmap_string_db.BitPos(raw_value, bit, start, stop, stop_given,
pos);
}
@@ -335,9 +341,8 @@ rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool
bit, int64_t start, i
return -1;
};
- LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
- read_options.snapshot = ss.GetSnapShot();
+ read_options.snapshot = ss->GetSnapShot();
uint32_t start_index = u_start / kBitmapSegmentBytes;
uint32_t stop_index = u_stop / kBitmapSegmentBytes;
// Don't use multi get to prevent large range query, and take too much memory
@@ -417,7 +422,7 @@ rocksdb::Status Bitmap::BitOp(BitOpFlags op_flag, const
std::string &op_name, co
for (const auto &op_key : op_keys) {
BitmapMetadata metadata(false);
std::string ns_op_key = AppendNamespacePrefix(op_key);
- auto s = GetMetadata(ns_op_key, &metadata, &raw_value);
+ auto s = GetMetadata(GetOptions{}, ns_op_key, &metadata, &raw_value);
if (!s.ok()) {
if (s.IsNotFound()) {
continue;
@@ -769,7 +774,8 @@ rocksdb::Status Bitmap::bitfield(const Slice &user_key,
const std::vector<Bitfie
BitmapMetadata metadata;
std::string raw_value;
- auto s = GetMetadata(ns_key, &metadata, &raw_value);
+ // TODO(mwish): maintain snapshot for read-only bitfield.
+ auto s = GetMetadata(GetOptions{}, ns_key, &metadata, &raw_value);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
diff --git a/src/types/redis_bitmap.h b/src/types/redis_bitmap.h
index 349982d8..047a33fd 100644
--- a/src/types/redis_bitmap.h
+++ b/src/types/redis_bitmap.h
@@ -72,7 +72,8 @@ class Bitmap : public Database {
std::vector<std::optional<BitfieldValue>> *rets);
static bool bitfieldWriteAheadLog(const
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
const std::vector<BitfieldOperation> &ops);
- rocksdb::Status GetMetadata(const Slice &ns_key, BitmapMetadata *metadata,
std::string *raw_value);
+ rocksdb::Status GetMetadata(Database::GetOptions get_options, const Slice
&ns_key, BitmapMetadata *metadata,
+ std::string *raw_value);
template <bool ReadOnly>
static rocksdb::Status runBitfieldOperationsWithCache(SegmentCacheStore
&cache,
diff --git a/src/types/redis_bloom_chain.cc b/src/types/redis_bloom_chain.cc
index 90a43f05..6a4f3860 100644
--- a/src/types/redis_bloom_chain.cc
+++ b/src/types/redis_bloom_chain.cc
@@ -24,8 +24,9 @@
namespace redis {
-rocksdb::Status BloomChain::getBloomChainMetadata(const Slice &ns_key,
BloomChainMetadata *metadata) {
- return Database::GetMetadata({kRedisBloomFilter}, ns_key, metadata);
+rocksdb::Status BloomChain::getBloomChainMetadata(Database::GetOptions
get_options, const Slice &ns_key,
+ BloomChainMetadata
*metadata) {
+ return Database::GetMetadata(get_options, {kRedisBloomFilter}, ns_key,
metadata);
}
std::string BloomChain::getBFKey(const Slice &ns_key, const BloomChainMetadata
&metadata, uint16_t filters_index) {
@@ -124,7 +125,7 @@ rocksdb::Status BloomChain::Reserve(const Slice &user_key,
uint32_t capacity, do
LockGuard guard(storage_->GetLockManager(), ns_key);
BloomChainMetadata bloom_chain_metadata;
- rocksdb::Status s = getBloomChainMetadata(ns_key, &bloom_chain_metadata);
+ rocksdb::Status s = getBloomChainMetadata(GetOptions{}, ns_key,
&bloom_chain_metadata);
if (!s.ok() && !s.IsNotFound()) return s;
if (!s.IsNotFound()) {
return rocksdb::Status::InvalidArgument("the key already exists");
@@ -153,7 +154,7 @@ rocksdb::Status BloomChain::InsertCommon(const Slice
&user_key, const std::vecto
LockGuard guard(storage_->GetLockManager(), ns_key);
BloomChainMetadata metadata;
- rocksdb::Status s = getBloomChainMetadata(ns_key, &metadata);
+ rocksdb::Status s = getBloomChainMetadata(GetOptions{}, ns_key, &metadata);
if (s.IsNotFound() && insert_options.auto_create) {
s = createBloomChain(ns_key, insert_options.error_rate,
insert_options.capacity, insert_options.expansion,
@@ -235,7 +236,7 @@ rocksdb::Status BloomChain::MExists(const Slice &user_key,
const std::vector<std
std::string ns_key = AppendNamespacePrefix(user_key);
BloomChainMetadata metadata;
- rocksdb::Status s = getBloomChainMetadata(ns_key, &metadata);
+ rocksdb::Status s = getBloomChainMetadata(Database::GetOptions{}, ns_key,
&metadata);
if (s.IsNotFound()) {
std::fill(exists->begin(), exists->end(), false);
return rocksdb::Status::OK();
@@ -269,7 +270,7 @@ rocksdb::Status BloomChain::Info(const Slice &user_key,
BloomFilterInfo *info) {
std::string ns_key = AppendNamespacePrefix(user_key);
BloomChainMetadata metadata;
- rocksdb::Status s = getBloomChainMetadata(ns_key, &metadata);
+ rocksdb::Status s = getBloomChainMetadata(Database::GetOptions{}, ns_key,
&metadata);
if (!s.ok()) return s;
info->capacity = metadata.GetCapacity();
diff --git a/src/types/redis_bloom_chain.h b/src/types/redis_bloom_chain.h
index 59f4b5b7..6f5f76da 100644
--- a/src/types/redis_bloom_chain.h
+++ b/src/types/redis_bloom_chain.h
@@ -74,7 +74,8 @@ class BloomChain : public Database {
rocksdb::Status Info(const Slice &user_key, BloomFilterInfo *info);
private:
- rocksdb::Status getBloomChainMetadata(const Slice &ns_key,
BloomChainMetadata *metadata);
+ rocksdb::Status getBloomChainMetadata(Database::GetOptions get_options,
const Slice &ns_key,
+ BloomChainMetadata *metadata);
std::string getBFKey(const Slice &ns_key, const BloomChainMetadata
&metadata, uint16_t filters_index);
void getBFKeyList(const Slice &ns_key, const BloomChainMetadata &metadata,
std::vector<std::string> *bf_key_list);
rocksdb::Status getBFDataList(const std::vector<std::string> &bf_key_list,
diff --git a/src/types/redis_geo.cc b/src/types/redis_geo.cc
index 37adc35d..966e082f 100644
--- a/src/types/redis_geo.cc
+++ b/src/types/redis_geo.cc
@@ -128,7 +128,7 @@ rocksdb::Status Geo::SearchStore(const Slice &user_key,
GeoShape geo_shape, Orig
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata(false);
- rocksdb::Status s = ZSet::GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = ZSet::GetMetadata(GetOptions{}, ns_key, &metadata);
// store key is not empty, try to remove it before returning.
if (!s.ok() && s.IsNotFound() && !store_key.empty()) {
auto del_s = ZSet::Del(store_key);
diff --git a/src/types/redis_hash.cc b/src/types/redis_hash.cc
index dcb1978e..40e5a424 100644
--- a/src/types/redis_hash.cc
+++ b/src/types/redis_hash.cc
@@ -34,8 +34,8 @@
namespace redis {
-rocksdb::Status Hash::GetMetadata(const Slice &ns_key, HashMetadata *metadata)
{
- return Database::GetMetadata({kRedisHash}, ns_key, metadata);
+rocksdb::Status Hash::GetMetadata(Database::GetOptions get_options, const
Slice &ns_key, HashMetadata *metadata) {
+ return Database::GetMetadata(get_options, {kRedisHash}, ns_key, metadata);
}
rocksdb::Status Hash::Size(const Slice &user_key, uint64_t *size) {
@@ -43,7 +43,7 @@ rocksdb::Status Hash::Size(const Slice &user_key, uint64_t
*size) {
std::string ns_key = AppendNamespacePrefix(user_key);
HashMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(Database::GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s;
*size = metadata.size;
return rocksdb::Status::OK();
@@ -52,9 +52,9 @@ rocksdb::Status Hash::Size(const Slice &user_key, uint64_t
*size) {
rocksdb::Status Hash::Get(const Slice &user_key, const Slice &field,
std::string *value) {
std::string ns_key = AppendNamespacePrefix(user_key);
HashMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
- if (!s.ok()) return s;
LatestSnapShot ss(storage_);
+ rocksdb::Status s = GetMetadata(Database::GetOptions{.snapshot =
ss.GetSnapShot()}, ns_key, &metadata);
+ if (!s.ok()) return s;
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
std::string sub_key = InternalKey(ns_key, field, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
@@ -69,7 +69,7 @@ rocksdb::Status Hash::IncrBy(const Slice &user_key, const
Slice &field, int64_t
LockGuard guard(storage_->GetLockManager(), ns_key);
HashMetadata metadata;
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
std::string sub_key = InternalKey(ns_key, field, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
@@ -116,7 +116,7 @@ rocksdb::Status Hash::IncrByFloat(const Slice &user_key,
const Slice &field, dou
LockGuard guard(storage_->GetLockManager(), ns_key);
HashMetadata metadata;
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
std::string sub_key = InternalKey(ns_key, field, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
@@ -159,12 +159,12 @@ rocksdb::Status Hash::MGet(const Slice &user_key, const
std::vector<Slice> &fiel
std::string ns_key = AppendNamespacePrefix(user_key);
HashMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ LatestSnapShot ss(storage_);
+ rocksdb::Status s = GetMetadata(GetOptions{.snapshot = ss.GetSnapShot()},
ns_key, &metadata);
if (!s.ok()) {
return s;
}
- LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options = storage_->DefaultMultiGetOptions();
read_options.snapshot = ss.GetSnapShot();
std::vector<rocksdb::Slice> keys;
@@ -205,7 +205,7 @@ rocksdb::Status Hash::Delete(const Slice &user_key, const
std::vector<Slice> &fi
WriteBatchLogData log_data(kRedisHash);
batch->PutLogData(log_data.Encode());
LockGuard guard(storage_->GetLockManager(), ns_key);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string value;
@@ -238,7 +238,7 @@ rocksdb::Status Hash::MSet(const Slice &user_key, const
std::vector<FieldValue>
LockGuard guard(storage_->GetLockManager(), ns_key);
HashMetadata metadata;
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
int added = 0;
@@ -290,7 +290,8 @@ rocksdb::Status Hash::RangeByLex(const Slice &user_key,
const RangeLexSpec &spec
}
std::string ns_key = AppendNamespacePrefix(user_key);
HashMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ LatestSnapShot ss(storage_);
+ rocksdb::Status s = GetMetadata(GetOptions{.snapshot = ss.GetSnapShot()},
ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string start_member = spec.reversed ? spec.max : spec.min;
@@ -299,7 +300,6 @@ rocksdb::Status Hash::RangeByLex(const Slice &user_key,
const RangeLexSpec &spec
std::string next_version_prefix_key =
InternalKey(ns_key, "", metadata.version + 1,
storage_->IsSlotIdEncoded()).Encode();
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
- LatestSnapShot ss(storage_);
read_options.snapshot = ss.GetSnapShot();
rocksdb::Slice upper_bound(next_version_prefix_key);
read_options.iterate_upper_bound = &upper_bound;
@@ -346,7 +346,8 @@ rocksdb::Status Hash::GetAll(const Slice &user_key,
std::vector<FieldValue> *fie
std::string ns_key = AppendNamespacePrefix(user_key);
HashMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ LatestSnapShot ss(storage_);
+ rocksdb::Status s = GetMetadata(GetOptions{.snapshot = ss.GetSnapShot()},
ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string prefix_key = InternalKey(ns_key, "", metadata.version,
storage_->IsSlotIdEncoded()).Encode();
@@ -354,7 +355,6 @@ rocksdb::Status Hash::GetAll(const Slice &user_key,
std::vector<FieldValue> *fie
InternalKey(ns_key, "", metadata.version + 1,
storage_->IsSlotIdEncoded()).Encode();
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
- LatestSnapShot ss(storage_);
read_options.snapshot = ss.GetSnapShot();
rocksdb::Slice upper_bound(next_version_prefix_key);
read_options.iterate_upper_bound = &upper_bound;
@@ -387,7 +387,7 @@ rocksdb::Status Hash::RandField(const Slice &user_key,
int64_t command_count, st
std::string ns_key = AppendNamespacePrefix(user_key);
HashMetadata metadata(/*generate_version=*/false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s;
std::vector<FieldValue> samples;
diff --git a/src/types/redis_hash.h b/src/types/redis_hash.h
index 8ae0a066..10fc7d54 100644
--- a/src/types/redis_hash.h
+++ b/src/types/redis_hash.h
@@ -65,7 +65,7 @@ class Hash : public SubKeyScanner {
HashFetchType type = HashFetchType::kOnlyKey);
private:
- rocksdb::Status GetMetadata(const Slice &ns_key, HashMetadata *metadata);
+ rocksdb::Status GetMetadata(Database::GetOptions get_options, const Slice
&ns_key, HashMetadata *metadata);
friend struct FieldValueRetriever;
};
diff --git a/src/types/redis_json.cc b/src/types/redis_json.cc
index fb3add5f..dbf04563 100644
--- a/src/types/redis_json.cc
+++ b/src/types/redis_json.cc
@@ -74,7 +74,7 @@ rocksdb::Status Json::read(const Slice &ns_key, JsonMetadata
*metadata, JsonValu
std::string bytes;
Slice rest;
- auto s = GetMetadata({kRedisJson}, ns_key, &bytes, metadata, &rest);
+ auto s = GetMetadata(GetOptions{}, {kRedisJson}, ns_key, &bytes, metadata,
&rest);
if (!s.ok()) return s;
return parse(*metadata, rest, value);
@@ -105,7 +105,7 @@ rocksdb::Status Json::Info(const std::string &user_key,
JsonStorageFormat *stora
Slice rest;
JsonMetadata metadata;
- auto s = GetMetadata({kRedisJson}, ns_key, &bytes, &metadata, &rest);
+ auto s = GetMetadata(GetOptions{}, {kRedisJson}, ns_key, &bytes, &metadata,
&rest);
if (!s.ok()) return s;
*storage_format = metadata.format;
diff --git a/src/types/redis_list.cc b/src/types/redis_list.cc
index ed67007e..96b90c44 100644
--- a/src/types/redis_list.cc
+++ b/src/types/redis_list.cc
@@ -27,8 +27,8 @@
namespace redis {
-rocksdb::Status List::GetMetadata(const Slice &ns_key, ListMetadata *metadata)
{
- return Database::GetMetadata({kRedisList}, ns_key, metadata);
+rocksdb::Status List::GetMetadata(Database::GetOptions get_options, const
Slice &ns_key, ListMetadata *metadata) {
+ return Database::GetMetadata(get_options, {kRedisList}, ns_key, metadata);
}
rocksdb::Status List::Size(const Slice &user_key, uint64_t *size) {
@@ -36,7 +36,7 @@ rocksdb::Status List::Size(const Slice &user_key, uint64_t
*size) {
std::string ns_key = AppendNamespacePrefix(user_key);
ListMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
*size = metadata.size;
return rocksdb::Status::OK();
@@ -61,7 +61,7 @@ rocksdb::Status List::push(const Slice &user_key, const
std::vector<Slice> &elem
WriteBatchLogData log_data(kRedisList, {std::to_string(cmd)});
batch->PutLogData(log_data.Encode());
LockGuard guard(storage_->GetLockManager(), ns_key);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok() && !(create_if_missing && s.IsNotFound())) {
return s.IsNotFound() ? rocksdb::Status::OK() : s;
}
@@ -105,7 +105,7 @@ rocksdb::Status List::PopMulti(const rocksdb::Slice
&user_key, bool left, uint32
LockGuard guard(storage_->GetLockManager(), ns_key);
ListMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s;
auto batch = storage_->GetWriteBatchBase();
@@ -169,7 +169,7 @@ rocksdb::Status List::Rem(const Slice &user_key, int count,
const Slice &elem, u
LockGuard guard(storage_->GetLockManager(), ns_key);
ListMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s;
uint64_t index = count >= 0 ? metadata.head : metadata.tail - 1;
@@ -259,7 +259,7 @@ rocksdb::Status List::Insert(const Slice &user_key, const
Slice &pivot, const Sl
LockGuard guard(storage_->GetLockManager(), ns_key);
ListMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s;
std::string buf;
@@ -334,14 +334,14 @@ rocksdb::Status List::Index(const Slice &user_key, int
index, std::string *elem)
std::string ns_key = AppendNamespacePrefix(user_key);
ListMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ LatestSnapShot ss(storage_);
+ rocksdb::Status s = GetMetadata(GetOptions{.snapshot = ss.GetSnapShot()},
ns_key, &metadata);
if (!s.ok()) return s;
if (index < 0) index += static_cast<int>(metadata.size);
if (index < 0 || index >= static_cast<int>(metadata.size)) return
rocksdb::Status::NotFound();
rocksdb::ReadOptions read_options;
- LatestSnapShot ss(storage_);
read_options.snapshot = ss.GetSnapShot();
std::string buf;
PutFixed64(&buf, metadata.head + index);
@@ -359,7 +359,8 @@ rocksdb::Status List::Range(const Slice &user_key, int
start, int stop, std::vec
std::string ns_key = AppendNamespacePrefix(user_key);
ListMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ LatestSnapShot ss(storage_);
+ rocksdb::Status s = GetMetadata(GetOptions{.snapshot = ss.GetSnapShot()},
ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
if (start < 0) start = static_cast<int>(metadata.size) + start;
@@ -374,7 +375,6 @@ rocksdb::Status List::Range(const Slice &user_key, int
start, int stop, std::vec
std::string next_version_prefix = InternalKey(ns_key, "", metadata.version +
1, storage_->IsSlotIdEncoded()).Encode();
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
- LatestSnapShot ss(storage_);
read_options.snapshot = ss.GetSnapShot();
rocksdb::Slice upper_bound(next_version_prefix);
read_options.iterate_upper_bound = &upper_bound;
@@ -398,7 +398,7 @@ rocksdb::Status List::Pos(const Slice &user_key, const
Slice &elem, const PosSpe
std::string ns_key = AppendNamespacePrefix(user_key);
ListMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s;
// A negative rank means start from the tail.
@@ -454,7 +454,7 @@ rocksdb::Status List::Set(const Slice &user_key, int index,
Slice elem) {
LockGuard guard(storage_->GetLockManager(), ns_key);
ListMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s;
if (index < 0) index += static_cast<int>(metadata.size);
if (index < 0 || index >= static_cast<int>(metadata.size)) {
@@ -490,7 +490,7 @@ rocksdb::Status List::lmoveOnSingleList(const
rocksdb::Slice &src, bool src_left
LockGuard guard(storage_->GetLockManager(), ns_key);
ListMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) {
return s;
}
@@ -553,13 +553,13 @@ rocksdb::Status List::lmoveOnTwoLists(const
rocksdb::Slice &src, const rocksdb::
std::vector<std::string> lock_keys{src_ns_key, dst_ns_key};
MultiLockGuard guard(storage_->GetLockManager(), lock_keys);
ListMetadata src_metadata(false);
- auto s = GetMetadata(src_ns_key, &src_metadata);
+ auto s = GetMetadata(GetOptions{}, src_ns_key, &src_metadata);
if (!s.ok()) {
return s;
}
ListMetadata dst_metadata(false);
- s = GetMetadata(dst_ns_key, &dst_metadata);
+ s = GetMetadata(GetOptions{}, dst_ns_key, &dst_metadata);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
@@ -615,7 +615,7 @@ rocksdb::Status List::Trim(const Slice &user_key, int
start, int stop) {
LockGuard guard(storage_->GetLockManager(), ns_key);
ListMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
if (start < 0) start += static_cast<int>(metadata.size);
diff --git a/src/types/redis_list.h b/src/types/redis_list.h
index cf0effd8..d5e6c7a1 100644
--- a/src/types/redis_list.h
+++ b/src/types/redis_list.h
@@ -56,7 +56,7 @@ class List : public Database {
rocksdb::Status Pos(const Slice &user_key, const Slice &elem, const PosSpec
&spec, std::vector<int64_t> *indexes);
private:
- rocksdb::Status GetMetadata(const Slice &ns_key, ListMetadata *metadata);
+ rocksdb::Status GetMetadata(Database::GetOptions get_options, const Slice
&ns_key, ListMetadata *metadata);
rocksdb::Status push(const Slice &user_key, const std::vector<Slice> &elems,
bool create_if_missing, bool left,
uint64_t *new_size);
rocksdb::Status lmoveOnSingleList(const Slice &src, bool src_left, bool
dst_left, std::string *elem);
diff --git a/src/types/redis_set.cc b/src/types/redis_set.cc
index 35403b88..2e5074ee 100644
--- a/src/types/redis_set.cc
+++ b/src/types/redis_set.cc
@@ -29,8 +29,8 @@
namespace redis {
-rocksdb::Status Set::GetMetadata(const Slice &ns_key, SetMetadata *metadata) {
- return Database::GetMetadata({kRedisSet}, ns_key, metadata);
+rocksdb::Status Set::GetMetadata(Database::GetOptions get_options, const Slice
&ns_key, SetMetadata *metadata) {
+ return Database::GetMetadata(get_options, {kRedisSet}, ns_key, metadata);
}
// Make sure members are uniq before use Overwrite
@@ -60,7 +60,7 @@ rocksdb::Status Set::Add(const Slice &user_key, const
std::vector<Slice> &member
LockGuard guard(storage_->GetLockManager(), ns_key);
SetMetadata metadata;
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(Database::GetOptions{}, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
std::string value;
@@ -94,7 +94,7 @@ rocksdb::Status Set::Remove(const Slice &user_key, const
std::vector<Slice> &mem
LockGuard guard(storage_->GetLockManager(), ns_key);
SetMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(Database::GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string value;
@@ -130,7 +130,7 @@ rocksdb::Status Set::Card(const Slice &user_key, uint64_t
*size) {
std::string ns_key = AppendNamespacePrefix(user_key);
SetMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(Database::GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
*size = metadata.size;
return rocksdb::Status::OK();
@@ -142,14 +142,15 @@ rocksdb::Status Set::Members(const Slice &user_key,
std::vector<std::string> *me
std::string ns_key = AppendNamespacePrefix(user_key);
SetMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ LatestSnapShot ss(storage_);
+
+ rocksdb::Status s = GetMetadata(Database::GetOptions{.snapshot =
ss.GetSnapShot()}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string prefix = InternalKey(ns_key, "", metadata.version,
storage_->IsSlotIdEncoded()).Encode();
std::string next_version_prefix = InternalKey(ns_key, "", metadata.version +
1, storage_->IsSlotIdEncoded()).Encode();
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
- LatestSnapShot ss(storage_);
read_options.snapshot = ss.GetSnapShot();
rocksdb::Slice upper_bound(next_version_prefix);
read_options.iterate_upper_bound = &upper_bound;
@@ -176,11 +177,12 @@ rocksdb::Status Set::MIsMember(const Slice &user_key,
const std::vector<Slice> &
std::string ns_key = AppendNamespacePrefix(user_key);
SetMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ LatestSnapShot ss(storage_);
+ rocksdb::Status s = GetMetadata(Database::GetOptions{.snapshot =
ss.GetSnapShot()}, ns_key, &metadata);
if (!s.ok()) return s;
rocksdb::ReadOptions read_options;
- LatestSnapShot ss(storage_);
+
read_options.snapshot = ss.GetSnapShot();
std::string value;
for (const auto &member : members) {
@@ -212,7 +214,7 @@ rocksdb::Status Set::Take(const Slice &user_key,
std::vector<std::string> *membe
if (pop) lock_guard.emplace(storage_->GetLockManager(), ns_key);
SetMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(Database::GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
ObserverOrUniquePtr<rocksdb::WriteBatchBase> batch =
storage_->GetWriteBatchBase();
diff --git a/src/types/redis_set.h b/src/types/redis_set.h
index 739c3077..bfd8bd3c 100644
--- a/src/types/redis_set.h
+++ b/src/types/redis_set.h
@@ -52,7 +52,7 @@ class Set : public SubKeyScanner {
const std::string &member_prefix,
std::vector<std::string> *members);
private:
- rocksdb::Status GetMetadata(const Slice &ns_key, SetMetadata *metadata);
+ rocksdb::Status GetMetadata(Database::GetOptions options, const Slice
&ns_key, SetMetadata *metadata);
};
} // namespace redis
diff --git a/src/types/redis_sortedint.cc b/src/types/redis_sortedint.cc
index 6670eb74..4edcff1c 100644
--- a/src/types/redis_sortedint.cc
+++ b/src/types/redis_sortedint.cc
@@ -28,8 +28,9 @@
namespace redis {
-rocksdb::Status Sortedint::GetMetadata(const Slice &ns_key, SortedintMetadata
*metadata) {
- return Database::GetMetadata({kRedisSortedint}, ns_key, metadata);
+rocksdb::Status Sortedint::GetMetadata(Database::GetOptions get_options, const
Slice &ns_key,
+ SortedintMetadata *metadata) {
+ return Database::GetMetadata(get_options, {kRedisSortedint}, ns_key,
metadata);
}
rocksdb::Status Sortedint::Add(const Slice &user_key, const
std::vector<uint64_t> &ids, uint64_t *added_cnt) {
@@ -39,7 +40,7 @@ rocksdb::Status Sortedint::Add(const Slice &user_key, const
std::vector<uint64_t
LockGuard guard(storage_->GetLockManager(), ns_key);
SortedintMetadata metadata;
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
std::string value;
@@ -72,7 +73,7 @@ rocksdb::Status Sortedint::Remove(const Slice &user_key,
const std::vector<uint6
LockGuard guard(storage_->GetLockManager(), ns_key);
SortedintMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string value;
@@ -101,7 +102,7 @@ rocksdb::Status Sortedint::Card(const Slice &user_key,
uint64_t *size) {
std::string ns_key = AppendNamespacePrefix(user_key);
SortedintMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
*size = metadata.size;
return rocksdb::Status::OK();
@@ -114,7 +115,8 @@ rocksdb::Status Sortedint::Range(const Slice &user_key,
uint64_t cursor_id, uint
std::string ns_key = AppendNamespacePrefix(user_key);
SortedintMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ LatestSnapShot ss(storage_);
+ rocksdb::Status s = GetMetadata(GetOptions{.snapshot = ss.GetSnapShot()},
ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string start_buf;
@@ -128,7 +130,6 @@ rocksdb::Status Sortedint::Range(const Slice &user_key,
uint64_t cursor_id, uint
std::string next_version_prefix = InternalKey(ns_key, "", metadata.version +
1, storage_->IsSlotIdEncoded()).Encode();
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
- LatestSnapShot ss(storage_);
read_options.snapshot = ss.GetSnapShot();
rocksdb::Slice upper_bound(next_version_prefix);
read_options.iterate_upper_bound = &upper_bound;
@@ -157,7 +158,8 @@ rocksdb::Status Sortedint::RangeByValue(const Slice
&user_key, SortedintRangeSpe
std::string ns_key = AppendNamespacePrefix(user_key);
SortedintMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ LatestSnapShot ss(storage_);
+ rocksdb::Status s = GetMetadata(GetOptions{.snapshot = ss.GetSnapShot()},
ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string start_buf;
@@ -168,7 +170,6 @@ rocksdb::Status Sortedint::RangeByValue(const Slice
&user_key, SortedintRangeSpe
InternalKey(ns_key, "", metadata.version + 1,
storage_->IsSlotIdEncoded()).Encode();
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
- LatestSnapShot ss(storage_);
read_options.snapshot = ss.GetSnapShot();
rocksdb::Slice upper_bound(next_version_prefix_key);
read_options.iterate_upper_bound = &upper_bound;
@@ -207,10 +208,10 @@ rocksdb::Status Sortedint::MExist(const Slice &user_key,
const std::vector<uint6
std::string ns_key = AppendNamespacePrefix(user_key);
SortedintMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ LatestSnapShot ss(storage_);
+ rocksdb::Status s = GetMetadata(GetOptions{.snapshot = ss.GetSnapShot()},
ns_key, &metadata);
if (!s.ok()) return s;
- LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
std::string value;
diff --git a/src/types/redis_sortedint.h b/src/types/redis_sortedint.h
index 38f0ca8d..c7f10feb 100644
--- a/src/types/redis_sortedint.h
+++ b/src/types/redis_sortedint.h
@@ -50,7 +50,7 @@ class Sortedint : public Database {
static Status ParseRangeSpec(const std::string &min, const std::string &max,
SortedintRangeSpec *spec);
private:
- rocksdb::Status GetMetadata(const Slice &ns_key, SortedintMetadata
*metadata);
+ rocksdb::Status GetMetadata(Database::GetOptions get_options, const Slice
&ns_key, SortedintMetadata *metadata);
};
} // namespace redis
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index aedcdfe8..1b9444ed 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -44,15 +44,16 @@ const char *errXGroupSubcommandRequiresKeyExist =
"The XGROUP subcommand requires the key to exist.\
Note that for CREATE you may want to use the MKSTREAM option to create an
empty stream automatically.";
-rocksdb::Status Stream::GetMetadata(const Slice &stream_name, StreamMetadata
*metadata) {
- return Database::GetMetadata({kRedisStream}, stream_name, metadata);
+rocksdb::Status Stream::GetMetadata(Database::GetOptions get_options, const
Slice &stream_name,
+ StreamMetadata *metadata) {
+ return Database::GetMetadata(get_options, {kRedisStream}, stream_name,
metadata);
}
rocksdb::Status Stream::GetLastGeneratedID(const Slice &stream_name,
StreamEntryID *id) {
std::string ns_key = AppendNamespacePrefix(stream_name);
StreamMetadata metadata;
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
@@ -99,7 +100,7 @@ rocksdb::Status Stream::Add(const Slice &stream_name, const
StreamAddOptions &op
LockGuard guard(storage_->GetLockManager(), ns_key);
StreamMetadata metadata;
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
if (s.IsNotFound() && options.nomkstream) {
@@ -322,7 +323,7 @@ rocksdb::Status Stream::DeletePelEntries(const Slice
&stream_name, const std::st
LockGuard guard(storage_->GetLockManager(), ns_key);
StreamMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) {
return s.IsNotFound() ? rocksdb::Status::OK() : s;
}
@@ -365,7 +366,7 @@ rocksdb::Status Stream::CreateGroup(const Slice
&stream_name, const StreamXGroup
LockGuard guard(storage_->GetLockManager(), ns_key);
StreamMetadata metadata;
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
@@ -414,7 +415,7 @@ rocksdb::Status Stream::DestroyGroup(const Slice
&stream_name, const std::string
LockGuard guard(storage_->GetLockManager(), ns_key);
StreamMetadata metadata;
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
@@ -465,7 +466,7 @@ rocksdb::Status Stream::createConsumerWithoutLock(const
Slice &stream_name, cons
}
std::string ns_key = AppendNamespacePrefix(stream_name);
StreamMetadata metadata;
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
@@ -522,7 +523,7 @@ rocksdb::Status Stream::DestroyConsumer(const Slice
&stream_name, const std::str
std::string ns_key = AppendNamespacePrefix(stream_name);
LockGuard guard(storage_->GetLockManager(), ns_key);
StreamMetadata metadata;
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
@@ -593,7 +594,7 @@ rocksdb::Status Stream::GroupSetId(const Slice
&stream_name, const std::string &
std::string ns_key = AppendNamespacePrefix(stream_name);
LockGuard guard(storage_->GetLockManager(), ns_key);
StreamMetadata metadata;
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
@@ -639,7 +640,7 @@ rocksdb::Status Stream::DeleteEntries(const Slice
&stream_name, const std::vecto
LockGuard guard(storage_->GetLockManager(), ns_key);
StreamMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) {
return s.IsNotFound() ? rocksdb::Status::OK() : s;
}
@@ -728,7 +729,7 @@ rocksdb::Status Stream::Len(const Slice &stream_name, const
StreamLenOptions &op
std::string ns_key = AppendNamespacePrefix(stream_name);
StreamMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) {
return s.IsNotFound() ? rocksdb::Status::OK() : s;
}
@@ -877,7 +878,7 @@ rocksdb::Status Stream::GetStreamInfo(const rocksdb::Slice
&stream_name, bool fu
LockGuard guard(storage_->GetLockManager(), ns_key);
StreamMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s;
info->size = metadata.size;
@@ -1001,7 +1002,7 @@ rocksdb::Status Stream::GetGroupInfo(const Slice
&stream_name,
std::vector<std::pair<std::string,
StreamConsumerGroupMetadata>> &group_metadata) {
std::string ns_key = AppendNamespacePrefix(stream_name);
StreamMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s;
std::string next_version_prefix_key =
@@ -1034,7 +1035,7 @@ rocksdb::Status Stream::GetConsumerInfo(
std::vector<std::pair<std::string, StreamConsumerMetadata>>
&consumer_metadata) {
std::string ns_key = AppendNamespacePrefix(stream_name);
StreamMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s;
std::string next_version_prefix_key =
@@ -1082,7 +1083,7 @@ rocksdb::Status Stream::Range(const Slice &stream_name,
const StreamRangeOptions
std::string ns_key = AppendNamespacePrefix(stream_name);
StreamMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) {
return s.IsNotFound() ? rocksdb::Status::OK() : s;
}
@@ -1111,7 +1112,7 @@ rocksdb::Status Stream::RangeWithPending(const Slice
&stream_name, StreamRangeOp
LockGuard guard(storage_->GetLockManager(), ns_key);
StreamMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) {
return s.IsNotFound() ? rocksdb::Status::OK() : s;
}
@@ -1242,7 +1243,7 @@ rocksdb::Status Stream::Trim(const Slice &stream_name,
const StreamTrimOptions &
LockGuard guard(storage_->GetLockManager(), ns_key);
StreamMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) {
return s.IsNotFound() ? rocksdb::Status::OK() : s;
}
@@ -1347,7 +1348,7 @@ rocksdb::Status Stream::SetId(const Slice &stream_name,
const StreamEntryID &las
LockGuard guard(storage_->GetLockManager(), ns_key);
StreamMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index f569c253..662f93f7 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -62,7 +62,7 @@ class Stream : public SubKeyScanner {
std::vector<StreamEntry> *entries,
std::string &group_name,
std::string &consumer_name, bool noack,
bool latest);
rocksdb::Status Trim(const Slice &stream_name, const StreamTrimOptions
&options, uint64_t *delete_cnt);
- rocksdb::Status GetMetadata(const Slice &stream_name, StreamMetadata
*metadata);
+ rocksdb::Status GetMetadata(Database::GetOptions get_options, const Slice
&stream_name, StreamMetadata *metadata);
rocksdb::Status GetLastGeneratedID(const Slice &stream_name, StreamEntryID
*id);
rocksdb::Status SetId(const Slice &stream_name, const StreamEntryID
&last_generated_id,
std::optional<uint64_t> entries_added,
std::optional<StreamEntryID> max_deleted_id);
diff --git a/src/types/redis_string.cc b/src/types/redis_string.cc
index 38df40b5..30c09dd3 100644
--- a/src/types/redis_string.cc
+++ b/src/types/redis_string.cc
@@ -62,7 +62,7 @@ std::vector<rocksdb::Status> String::getRawValues(const
std::vector<Slice> &keys
rocksdb::Status String::getRawValue(const std::string &ns_key, std::string
*raw_value) {
raw_value->clear();
- auto s = GetRawMetadata(ns_key, raw_value);
+ auto s = GetRawMetadata(GetOptions{}, ns_key, raw_value);
if (!s.ok()) return s;
Metadata metadata(kRedisNone, false);
diff --git a/src/types/redis_zset.cc b/src/types/redis_zset.cc
index 1dd2feb5..8e08ea4e 100644
--- a/src/types/redis_zset.cc
+++ b/src/types/redis_zset.cc
@@ -32,8 +32,8 @@
namespace redis {
-rocksdb::Status ZSet::GetMetadata(const Slice &ns_key, ZSetMetadata *metadata)
{
- return Database::GetMetadata({kRedisZSet}, ns_key, metadata);
+rocksdb::Status ZSet::GetMetadata(Database::GetOptions get_options, const
Slice &ns_key, ZSetMetadata *metadata) {
+ return Database::GetMetadata(get_options, {kRedisZSet}, ns_key, metadata);
}
rocksdb::Status ZSet::Add(const Slice &user_key, ZAddFlags flags, MemberScores
*mscores, uint64_t *added_cnt) {
@@ -43,7 +43,7 @@ rocksdb::Status ZSet::Add(const Slice &user_key, ZAddFlags
flags, MemberScores *
LockGuard guard(storage_->GetLockManager(), ns_key);
ZSetMetadata metadata;
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(Database::GetOptions{}, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
int added = 0;
@@ -52,7 +52,7 @@ rocksdb::Status ZSet::Add(const Slice &user_key, ZAddFlags
flags, MemberScores *
WriteBatchLogData log_data(kRedisZSet);
batch->PutLogData(log_data.Encode());
std::unordered_set<std::string_view> added_member_keys;
- for (auto it = mscores->rbegin(); it != mscores->rend(); it++) {
+ for (auto it = mscores->rbegin(); it != mscores->rend(); ++it) {
if (!added_member_keys.insert(it->member).second) {
continue;
}
@@ -125,7 +125,7 @@ rocksdb::Status ZSet::Card(const Slice &user_key, uint64_t
*size) {
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
*size = metadata.size;
return rocksdb::Status::OK();
@@ -152,7 +152,7 @@ rocksdb::Status ZSet::Pop(const Slice &user_key, int count,
bool min, MemberScor
LockGuard guard(storage_->GetLockManager(), ns_key);
ZSetMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
if (count <= 0) return rocksdb::Status::OK();
if (count > static_cast<int>(metadata.size)) count =
static_cast<int>(metadata.size);
@@ -216,7 +216,8 @@ rocksdb::Status ZSet::RangeByRank(const Slice &user_key,
const RangeRankSpec &sp
std::optional<LockGuard> lock_guard;
if (spec.with_deletion) lock_guard.emplace(storage_->GetLockManager(),
ns_key);
ZSetMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ LatestSnapShot ss(storage_);
+ rocksdb::Status s = GetMetadata(GetOptions{.snapshot = ss.GetSnapShot()},
ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
int start = spec.start;
@@ -239,7 +240,6 @@ rocksdb::Status ZSet::RangeByRank(const Slice &user_key,
const RangeRankSpec &sp
int removed_subkey = 0;
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
- LatestSnapShot ss(storage_);
read_options.snapshot = ss.GetSnapShot();
rocksdb::Slice upper_bound(next_version_prefix_key);
read_options.iterate_upper_bound = &upper_bound;
@@ -296,7 +296,7 @@ rocksdb::Status ZSet::RangeByScore(const Slice &user_key,
const RangeScoreSpec &
std::optional<LockGuard> lock_guard;
if (spec.with_deletion) lock_guard.emplace(storage_->GetLockManager(),
ns_key);
ZSetMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
// let's get familiar with score first:
@@ -419,7 +419,7 @@ rocksdb::Status ZSet::RangeByLex(const Slice &user_key,
const RangeLexSpec &spec
lock_guard.emplace(storage_->GetLockManager(), ns_key);
}
ZSetMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string start_member = spec.reversed ? spec.max : spec.min;
@@ -493,7 +493,7 @@ rocksdb::Status ZSet::RangeByLex(const Slice &user_key,
const RangeLexSpec &spec
rocksdb::Status ZSet::Score(const Slice &user_key, const Slice &member, double
*score) {
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s;
rocksdb::ReadOptions read_options;
@@ -514,7 +514,7 @@ rocksdb::Status ZSet::Remove(const Slice &user_key, const
std::vector<Slice> &me
LockGuard guard(storage_->GetLockManager(), ns_key);
ZSetMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
auto batch = storage_->GetWriteBatchBase();
@@ -554,11 +554,11 @@ rocksdb::Status ZSet::Rank(const Slice &user_key, const
Slice &member, bool reve
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ LatestSnapShot ss(storage_);
+ rocksdb::Status s = GetMetadata(GetOptions{.snapshot = ss.GetSnapShot()},
ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
- LatestSnapShot ss(storage_);
read_options.snapshot = ss.GetSnapShot();
std::string score_bytes;
std::string member_key = InternalKey(ns_key, member, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
@@ -831,11 +831,11 @@ rocksdb::Status ZSet::MGet(const Slice &user_key, const
std::vector<Slice> &memb
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ LatestSnapShot ss(storage_);
+ rocksdb::Status s = GetMetadata(GetOptions{.snapshot = ss.GetSnapShot()},
ns_key, &metadata);
if (!s.ok()) return s;
rocksdb::ReadOptions read_options;
- LatestSnapShot ss(storage_);
read_options.snapshot = ss.GetSnapShot();
std::string score_bytes;
for (const auto &member : members) {
@@ -856,7 +856,8 @@ rocksdb::Status ZSet::GetAllMemberScores(const Slice
&user_key, std::vector<Memb
member_scores->clear();
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ LatestSnapShot ss(storage_);
+ rocksdb::Status s = GetMetadata(GetOptions{.snapshot = ss.GetSnapShot()},
ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string prefix_key = InternalKey(ns_key, "", metadata.version,
storage_->IsSlotIdEncoded()).Encode();
@@ -864,7 +865,6 @@ rocksdb::Status ZSet::GetAllMemberScores(const Slice
&user_key, std::vector<Memb
InternalKey(ns_key, "", metadata.version + 1,
storage_->IsSlotIdEncoded()).Encode();
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
- LatestSnapShot ss(storage_);
read_options.snapshot = ss.GetSnapShot();
rocksdb::Slice upper_bound(next_version_prefix_key);
@@ -896,7 +896,7 @@ rocksdb::Status ZSet::RandMember(const Slice &user_key,
int64_t command_count,
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata(false);
- rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
if (metadata.size == 0) return rocksdb::Status::OK();
diff --git a/src/types/redis_zset.h b/src/types/redis_zset.h
index d806d57e..e446a7f1 100644
--- a/src/types/redis_zset.h
+++ b/src/types/redis_zset.h
@@ -119,7 +119,7 @@ class ZSet : public SubKeyScanner {
rocksdb::Status Diff(const std::vector<Slice> &keys, MemberScores *members);
rocksdb::Status DiffStore(const Slice &dst, const std::vector<Slice> &keys,
uint64_t *stored_count);
rocksdb::Status MGet(const Slice &user_key, const std::vector<Slice>
&members, std::map<std::string, double> *scores);
- rocksdb::Status GetMetadata(const Slice &ns_key, ZSetMetadata *metadata);
+ rocksdb::Status GetMetadata(Database::GetOptions get_options, const Slice
&ns_key, ZSetMetadata *metadata);
rocksdb::Status Count(const Slice &user_key, const RangeScoreSpec &spec,
uint64_t *size);
rocksdb::Status RangeByRank(const Slice &user_key, const RangeRankSpec
&spec, MemberScores *mscores,
diff --git a/tests/cppunit/metadata_test.cc b/tests/cppunit/metadata_test.cc
index 5e3e60ee..bd728205 100644
--- a/tests/cppunit/metadata_test.cc
+++ b/tests/cppunit/metadata_test.cc
@@ -92,7 +92,7 @@ TEST_F(RedisTypeTest, GetMetadata) {
EXPECT_TRUE(s.ok() && fvs.size() == ret);
HashMetadata metadata;
std::string ns_key = redis_->AppendNamespacePrefix(key_);
- s = redis_->GetMetadata({kRedisHash}, ns_key, &metadata);
+ s = redis_->GetMetadata(redis::Database::GetOptions{}, {kRedisHash}, ns_key,
&metadata);
EXPECT_EQ(fvs.size(), metadata.size);
s = redis_->Del(key_);
EXPECT_TRUE(s.ok());