This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 83c0c0ed Trying to set rocksdb iterator lower_bound if possible (#2275)
83c0c0ed is described below
commit 83c0c0ede0e5e24e989ba2e594215ffda65b3477
Author: mwish <[email protected]>
AuthorDate: Sat Apr 27 12:11:40 2024 +0800
Trying to set rocksdb iterator lower_bound if possible (#2275)
---
src/cluster/slot_migrate.cc | 32 ++++++++++++++++++++------------
src/storage/redis_db.cc | 13 ++-----------
src/storage/storage.cc | 20 ++++++++++----------
src/storage/storage.h | 4 ++--
src/types/redis_bitmap.cc | 2 ++
5 files changed, 36 insertions(+), 35 deletions(-)
diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc
index 884c8ace..a9c466a4 100644
--- a/src/cluster/slot_migrate.cc
+++ b/src/cluster/slot_migrate.cc
@@ -334,15 +334,17 @@ Status SlotMigrator::sendSnapshotByCmd() {
LOG(INFO) << "[migrate] Start migrating snapshot of slot " << slot;
+ // Construct key prefix to iterate the keys belong to the target slot
+ std::string prefix = ComposeSlotKeyPrefix(namespace_, slot);
+ LOG(INFO) << "[migrate] Iterate keys of slot, key's prefix: " << prefix;
+
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = slot_snapshot_;
+ Slice prefix_slice(prefix);
+ read_options.iterate_lower_bound = &prefix_slice;
rocksdb::ColumnFamilyHandle *cf_handle =
storage_->GetCFHandle(engine::kMetadataColumnFamilyName);
auto iter =
util::UniqueIterator(storage_->GetDB()->NewIterator(read_options, cf_handle));
- // Construct key prefix to iterate the keys belong to the target slot
- std::string prefix = ComposeSlotKeyPrefix(namespace_, slot);
- LOG(INFO) << "[migrate] Iterate keys of slot, key's prefix: " << prefix;
-
// Seek to the beginning of keys start with 'prefix' and iterate all these
keys
for (iter->Seek(prefix); iter->Valid(); iter->Next()) {
// The migrating task has to be stopped, if server role is changed from
master to slave
@@ -738,14 +740,16 @@ Status SlotMigrator::migrateComplexKey(const
rocksdb::Slice &key, const Metadata
cmd = type_to_cmd[metadata.Type()];
std::vector<std::string> user_cmd = {cmd, key.ToString()};
+ // Construct key prefix to iterate values of the complex type user key
+ std::string slot_key = AppendNamespacePrefix(key);
+ std::string prefix_subkey = InternalKey(slot_key, "", metadata.version,
true).Encode();
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = slot_snapshot_;
+ Slice prefix_slice(prefix_subkey);
+ read_options.iterate_lower_bound = &prefix_slice;
// Should use th raw db iterator to avoid reading uncommitted writes in
transaction mode
auto iter =
util::UniqueIterator(storage_->GetDB()->NewIterator(read_options));
- // Construct key prefix to iterate values of the complex type user key
- std::string slot_key = AppendNamespacePrefix(key);
- std::string prefix_subkey = InternalKey(slot_key, "", metadata.version,
true).Encode();
int item_count = 0;
for (iter->Seek(prefix_subkey); iter->Valid(); iter->Next()) {
@@ -840,13 +844,15 @@ Status SlotMigrator::migrateComplexKey(const
rocksdb::Slice &key, const Metadata
Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata
&metadata, std::string *restore_cmds) {
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = slot_snapshot_;
- // Should use th raw db iterator to avoid reading uncommitted writes in
transaction mode
- auto iter = util::UniqueIterator(
- storage_->GetDB()->NewIterator(read_options,
storage_->GetCFHandle(engine::kStreamColumnFamilyName)));
-
std::string ns_key = AppendNamespacePrefix(key);
// Construct key prefix to iterate values of the stream
std::string prefix_key = InternalKey(ns_key, "", metadata.version,
true).Encode();
+ rocksdb::Slice prefix_key_slice(prefix_key);
+ read_options.iterate_lower_bound = &prefix_key_slice;
+
+ // Should use th raw db iterator to avoid reading uncommitted writes in
transaction mode
+ auto iter = util::UniqueIterator(
+ storage_->GetDB()->NewIterator(read_options,
storage_->GetCFHandle(engine::kStreamColumnFamilyName)));
std::vector<std::string> user_cmd = {type_to_cmd[metadata.Type()],
key.ToString()};
@@ -1197,10 +1203,12 @@ Status SlotMigrator::sendSnapshotByRawKV() {
uint64_t start_ts = util::GetTimeStampMS();
LOG(INFO) << "[migrate] Migrating snapshot of slot " << migrating_slot_ << "
by raw key value";
+ auto prefix = ComposeSlotKeyPrefix(namespace_, migrating_slot_);
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = slot_snapshot_;
+ rocksdb::Slice prefix_slice(prefix);
+ read_options.iterate_lower_bound = &prefix_slice;
engine::DBIterator iter(storage_, read_options);
- auto prefix = ComposeSlotKeyPrefix(namespace_, migrating_slot_);
BatchSender batch_sender(*dst_fd_, migrate_batch_size_bytes_,
migrate_batch_bytes_per_sec_);
diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc
index a0e06615..3ff1fa0a 100644
--- a/src/storage/redis_db.cc
+++ b/src/storage/redis_db.cc
@@ -433,12 +433,7 @@ rocksdb::Status Database::FlushDB() {
if (!s.ok()) {
return rocksdb::Status::OK();
}
- s = storage_->DeleteRange(begin_key, end_key);
- if (!s.ok()) {
- return s;
- }
-
- return rocksdb::Status::OK();
+ return storage_->DeleteRange(begin_key, end_key);
}
rocksdb::Status Database::FlushAll() {
@@ -456,11 +451,7 @@ rocksdb::Status Database::FlushAll() {
return rocksdb::Status::OK();
}
auto last_key = iter->key().ToString();
- auto s = storage_->DeleteRange(first_key, last_key);
- if (!s.ok()) {
- return s;
- }
- return rocksdb::Status::OK();
+ return storage_->DeleteRange(first_key, last_key);
}
rocksdb::Status Database::Dump(const Slice &user_key, std::vector<std::string>
*infos) {
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index 2133645c..1759d2e7 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -101,11 +101,11 @@ void Storage::CloseDB() {
}
void Storage::SetWriteOptions(const Config::RocksDB::WriteOptions &config) {
- write_opts_.sync = config.sync;
- write_opts_.disableWAL = config.disable_wal;
- write_opts_.no_slowdown = config.no_slowdown;
- write_opts_.low_pri = config.low_pri;
- write_opts_.memtable_insert_hint_per_batch =
config.memtable_insert_hint_per_batch;
+ default_write_opts_.sync = config.sync;
+ default_write_opts_.disableWAL = config.disable_wal;
+ default_write_opts_.no_slowdown = config.no_slowdown;
+ default_write_opts_.low_pri = config.low_pri;
+ default_write_opts_.memtable_insert_hint_per_batch =
config.memtable_insert_hint_per_batch;
}
rocksdb::ReadOptions Storage::DefaultScanOptions() const {
@@ -523,7 +523,7 @@ Status Storage::RestoreFromCheckpoint() {
bool Storage::IsEmptyDB() {
std::unique_ptr<rocksdb::Iterator> iter(
- db_->NewIterator(rocksdb::ReadOptions(),
GetCFHandle(kMetadataColumnFamilyName)));
+ db_->NewIterator(DefaultScanOptions(),
GetCFHandle(kMetadataColumnFamilyName)));
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
Metadata metadata(kRedisNone, false);
// If cannot decode the metadata we think the key is alive, so the db is
not empty
@@ -688,7 +688,7 @@ rocksdb::Status Storage::DeleteRange(const std::string
&first_key, const std::st
return s;
}
- return Write(write_opts_, batch->GetWriteBatch());
+ return Write(default_write_opts_, batch->GetWriteBatch());
}
rocksdb::Status Storage::FlushScripts(const rocksdb::WriteOptions &options,
rocksdb::ColumnFamilyHandle *cf_handle) {
@@ -707,7 +707,7 @@ rocksdb::Status Storage::FlushScripts(const
rocksdb::WriteOptions &options, rock
}
Status Storage::ReplicaApplyWriteBatch(std::string &&raw_batch) {
- return ApplyWriteBatch(write_opts_, std::move(raw_batch));
+ return ApplyWriteBatch(default_write_opts_, std::move(raw_batch));
}
Status Storage::ApplyWriteBatch(const rocksdb::WriteOptions &options,
std::string &&raw_batch) {
@@ -845,7 +845,7 @@ Status Storage::CommitTxn() {
return Status{Status::NotOK, "cannot commit while not in transaction
mode"};
}
- auto s = writeToDB(write_opts_, txn_write_batch_->GetWriteBatch());
+ auto s = writeToDB(default_write_opts_, txn_write_batch_->GetWriteBatch());
is_txn_mode_ = false;
txn_write_batch_ = nullptr;
@@ -869,7 +869,7 @@ Status Storage::WriteToPropagateCF(const std::string &key,
const std::string &va
auto batch = GetWriteBatchBase();
auto cf = GetCFHandle(kPropagateColumnFamilyName);
batch->Put(cf, key, value);
- auto s = Write(write_opts_, batch->GetWriteBatch());
+ auto s = Write(default_write_opts_, batch->GetWriteBatch());
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
diff --git a/src/storage/storage.h b/src/storage/storage.h
index 44888f1c..208499b5 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -163,7 +163,7 @@ class Storage {
rocksdb::Iterator *NewIterator(const rocksdb::ReadOptions &options);
[[nodiscard]] rocksdb::Status Write(const rocksdb::WriteOptions &options,
rocksdb::WriteBatch *updates);
- const rocksdb::WriteOptions &DefaultWriteOptions() { return write_opts_; }
+ const rocksdb::WriteOptions &DefaultWriteOptions() { return
default_write_opts_; }
rocksdb::ReadOptions DefaultScanOptions() const;
rocksdb::ReadOptions DefaultMultiGetOptions() const;
[[nodiscard]] rocksdb::Status Delete(const rocksdb::WriteOptions &options,
rocksdb::ColumnFamilyHandle *cf_handle,
@@ -279,7 +279,7 @@ class Storage {
// command, so it won't have multi transactions to be executed at the same
time.
std::unique_ptr<rocksdb::WriteBatchWithIndex> txn_write_batch_;
- rocksdb::WriteOptions write_opts_ = rocksdb::WriteOptions();
+ rocksdb::WriteOptions default_write_opts_ = rocksdb::WriteOptions();
rocksdb::Status writeToDB(const rocksdb::WriteOptions &options,
rocksdb::WriteBatch *updates);
void recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_family,
const rocksdb::Status &s);
diff --git a/src/types/redis_bitmap.cc b/src/types/redis_bitmap.cc
index 3651f910..9a08c1fe 100644
--- a/src/types/redis_bitmap.cc
+++ b/src/types/redis_bitmap.cc
@@ -157,6 +157,8 @@ rocksdb::Status Bitmap::GetString(const Slice &user_key,
const uint32_t max_btos
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = ss.GetSnapShot();
+ Slice prefix_key_slice(prefix_key);
+ read_options.iterate_lower_bound = &prefix_key_slice;
auto iter = util::UniqueIterator(storage_, read_options);
for (iter->Seek(prefix_key); iter->Valid() &&
iter->key().starts_with(prefix_key); iter->Next()) {