This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 83c0c0ed Trying to set rocksdb iterator lower_bound if possible (#2275)
83c0c0ed is described below

commit 83c0c0ede0e5e24e989ba2e594215ffda65b3477
Author: mwish <[email protected]>
AuthorDate: Sat Apr 27 12:11:40 2024 +0800

    Trying to set rocksdb iterator lower_bound if possible (#2275)
---
 src/cluster/slot_migrate.cc | 32 ++++++++++++++++++++------------
 src/storage/redis_db.cc     | 13 ++-----------
 src/storage/storage.cc      | 20 ++++++++++----------
 src/storage/storage.h       |  4 ++--
 src/types/redis_bitmap.cc   |  2 ++
 5 files changed, 36 insertions(+), 35 deletions(-)

diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc
index 884c8ace..a9c466a4 100644
--- a/src/cluster/slot_migrate.cc
+++ b/src/cluster/slot_migrate.cc
@@ -334,15 +334,17 @@ Status SlotMigrator::sendSnapshotByCmd() {
 
   LOG(INFO) << "[migrate] Start migrating snapshot of slot " << slot;
 
+  // Construct key prefix to iterate the keys belong to the target slot
+  std::string prefix = ComposeSlotKeyPrefix(namespace_, slot);
+  LOG(INFO) << "[migrate] Iterate keys of slot, key's prefix: " << prefix;
+
   rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
   read_options.snapshot = slot_snapshot_;
+  Slice prefix_slice(prefix);
+  read_options.iterate_lower_bound = &prefix_slice;
   rocksdb::ColumnFamilyHandle *cf_handle = 
storage_->GetCFHandle(engine::kMetadataColumnFamilyName);
   auto iter = 
util::UniqueIterator(storage_->GetDB()->NewIterator(read_options, cf_handle));
 
-  // Construct key prefix to iterate the keys belong to the target slot
-  std::string prefix = ComposeSlotKeyPrefix(namespace_, slot);
-  LOG(INFO) << "[migrate] Iterate keys of slot, key's prefix: " << prefix;
-
   // Seek to the beginning of keys start with 'prefix' and iterate all these 
keys
   for (iter->Seek(prefix); iter->Valid(); iter->Next()) {
     // The migrating task has to be stopped, if server role is changed from 
master to slave
@@ -738,14 +740,16 @@ Status SlotMigrator::migrateComplexKey(const 
rocksdb::Slice &key, const Metadata
   cmd = type_to_cmd[metadata.Type()];
 
   std::vector<std::string> user_cmd = {cmd, key.ToString()};
+  // Construct key prefix to iterate values of the complex type user key
+  std::string slot_key = AppendNamespacePrefix(key);
+  std::string prefix_subkey = InternalKey(slot_key, "", metadata.version, 
true).Encode();
   rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
   read_options.snapshot = slot_snapshot_;
+  Slice prefix_slice(prefix_subkey);
+  read_options.iterate_lower_bound = &prefix_slice;
   // Should use th raw db iterator to avoid reading uncommitted writes in 
transaction mode
   auto iter = 
util::UniqueIterator(storage_->GetDB()->NewIterator(read_options));
 
-  // Construct key prefix to iterate values of the complex type user key
-  std::string slot_key = AppendNamespacePrefix(key);
-  std::string prefix_subkey = InternalKey(slot_key, "", metadata.version, 
true).Encode();
   int item_count = 0;
 
   for (iter->Seek(prefix_subkey); iter->Valid(); iter->Next()) {
@@ -840,13 +844,15 @@ Status SlotMigrator::migrateComplexKey(const 
rocksdb::Slice &key, const Metadata
 Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata 
&metadata, std::string *restore_cmds) {
   rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
   read_options.snapshot = slot_snapshot_;
-  // Should use th raw db iterator to avoid reading uncommitted writes in 
transaction mode
-  auto iter = util::UniqueIterator(
-      storage_->GetDB()->NewIterator(read_options, 
storage_->GetCFHandle(engine::kStreamColumnFamilyName)));
-
   std::string ns_key = AppendNamespacePrefix(key);
   // Construct key prefix to iterate values of the stream
   std::string prefix_key = InternalKey(ns_key, "", metadata.version, 
true).Encode();
+  rocksdb::Slice prefix_key_slice(prefix_key);
+  read_options.iterate_lower_bound = &prefix_key_slice;
+
+  // Should use th raw db iterator to avoid reading uncommitted writes in 
transaction mode
+  auto iter = util::UniqueIterator(
+      storage_->GetDB()->NewIterator(read_options, 
storage_->GetCFHandle(engine::kStreamColumnFamilyName)));
 
   std::vector<std::string> user_cmd = {type_to_cmd[metadata.Type()], 
key.ToString()};
 
@@ -1197,10 +1203,12 @@ Status SlotMigrator::sendSnapshotByRawKV() {
   uint64_t start_ts = util::GetTimeStampMS();
   LOG(INFO) << "[migrate] Migrating snapshot of slot " << migrating_slot_ << " 
by raw key value";
 
+  auto prefix = ComposeSlotKeyPrefix(namespace_, migrating_slot_);
   rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
   read_options.snapshot = slot_snapshot_;
+  rocksdb::Slice prefix_slice(prefix);
+  read_options.iterate_lower_bound = &prefix_slice;
   engine::DBIterator iter(storage_, read_options);
-  auto prefix = ComposeSlotKeyPrefix(namespace_, migrating_slot_);
 
   BatchSender batch_sender(*dst_fd_, migrate_batch_size_bytes_, 
migrate_batch_bytes_per_sec_);
 
diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc
index a0e06615..3ff1fa0a 100644
--- a/src/storage/redis_db.cc
+++ b/src/storage/redis_db.cc
@@ -433,12 +433,7 @@ rocksdb::Status Database::FlushDB() {
   if (!s.ok()) {
     return rocksdb::Status::OK();
   }
-  s = storage_->DeleteRange(begin_key, end_key);
-  if (!s.ok()) {
-    return s;
-  }
-
-  return rocksdb::Status::OK();
+  return storage_->DeleteRange(begin_key, end_key);
 }
 
 rocksdb::Status Database::FlushAll() {
@@ -456,11 +451,7 @@ rocksdb::Status Database::FlushAll() {
     return rocksdb::Status::OK();
   }
   auto last_key = iter->key().ToString();
-  auto s = storage_->DeleteRange(first_key, last_key);
-  if (!s.ok()) {
-    return s;
-  }
-  return rocksdb::Status::OK();
+  return storage_->DeleteRange(first_key, last_key);
 }
 
 rocksdb::Status Database::Dump(const Slice &user_key, std::vector<std::string> 
*infos) {
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index 2133645c..1759d2e7 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -101,11 +101,11 @@ void Storage::CloseDB() {
 }
 
 void Storage::SetWriteOptions(const Config::RocksDB::WriteOptions &config) {
-  write_opts_.sync = config.sync;
-  write_opts_.disableWAL = config.disable_wal;
-  write_opts_.no_slowdown = config.no_slowdown;
-  write_opts_.low_pri = config.low_pri;
-  write_opts_.memtable_insert_hint_per_batch = 
config.memtable_insert_hint_per_batch;
+  default_write_opts_.sync = config.sync;
+  default_write_opts_.disableWAL = config.disable_wal;
+  default_write_opts_.no_slowdown = config.no_slowdown;
+  default_write_opts_.low_pri = config.low_pri;
+  default_write_opts_.memtable_insert_hint_per_batch = 
config.memtable_insert_hint_per_batch;
 }
 
 rocksdb::ReadOptions Storage::DefaultScanOptions() const {
@@ -523,7 +523,7 @@ Status Storage::RestoreFromCheckpoint() {
 
 bool Storage::IsEmptyDB() {
   std::unique_ptr<rocksdb::Iterator> iter(
-      db_->NewIterator(rocksdb::ReadOptions(), 
GetCFHandle(kMetadataColumnFamilyName)));
+      db_->NewIterator(DefaultScanOptions(), 
GetCFHandle(kMetadataColumnFamilyName)));
   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
     Metadata metadata(kRedisNone, false);
     // If cannot decode the metadata we think the key is alive, so the db is 
not empty
@@ -688,7 +688,7 @@ rocksdb::Status Storage::DeleteRange(const std::string 
&first_key, const std::st
     return s;
   }
 
-  return Write(write_opts_, batch->GetWriteBatch());
+  return Write(default_write_opts_, batch->GetWriteBatch());
 }
 
 rocksdb::Status Storage::FlushScripts(const rocksdb::WriteOptions &options, 
rocksdb::ColumnFamilyHandle *cf_handle) {
@@ -707,7 +707,7 @@ rocksdb::Status Storage::FlushScripts(const 
rocksdb::WriteOptions &options, rock
 }
 
 Status Storage::ReplicaApplyWriteBatch(std::string &&raw_batch) {
-  return ApplyWriteBatch(write_opts_, std::move(raw_batch));
+  return ApplyWriteBatch(default_write_opts_, std::move(raw_batch));
 }
 
 Status Storage::ApplyWriteBatch(const rocksdb::WriteOptions &options, 
std::string &&raw_batch) {
@@ -845,7 +845,7 @@ Status Storage::CommitTxn() {
     return Status{Status::NotOK, "cannot commit while not in transaction 
mode"};
   }
 
-  auto s = writeToDB(write_opts_, txn_write_batch_->GetWriteBatch());
+  auto s = writeToDB(default_write_opts_, txn_write_batch_->GetWriteBatch());
 
   is_txn_mode_ = false;
   txn_write_batch_ = nullptr;
@@ -869,7 +869,7 @@ Status Storage::WriteToPropagateCF(const std::string &key, 
const std::string &va
   auto batch = GetWriteBatchBase();
   auto cf = GetCFHandle(kPropagateColumnFamilyName);
   batch->Put(cf, key, value);
-  auto s = Write(write_opts_, batch->GetWriteBatch());
+  auto s = Write(default_write_opts_, batch->GetWriteBatch());
   if (!s.ok()) {
     return {Status::NotOK, s.ToString()};
   }
diff --git a/src/storage/storage.h b/src/storage/storage.h
index 44888f1c..208499b5 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -163,7 +163,7 @@ class Storage {
   rocksdb::Iterator *NewIterator(const rocksdb::ReadOptions &options);
 
   [[nodiscard]] rocksdb::Status Write(const rocksdb::WriteOptions &options, 
rocksdb::WriteBatch *updates);
-  const rocksdb::WriteOptions &DefaultWriteOptions() { return write_opts_; }
+  const rocksdb::WriteOptions &DefaultWriteOptions() { return 
default_write_opts_; }
   rocksdb::ReadOptions DefaultScanOptions() const;
   rocksdb::ReadOptions DefaultMultiGetOptions() const;
   [[nodiscard]] rocksdb::Status Delete(const rocksdb::WriteOptions &options, 
rocksdb::ColumnFamilyHandle *cf_handle,
@@ -279,7 +279,7 @@ class Storage {
   // command, so it won't have multi transactions to be executed at the same 
time.
   std::unique_ptr<rocksdb::WriteBatchWithIndex> txn_write_batch_;
 
-  rocksdb::WriteOptions write_opts_ = rocksdb::WriteOptions();
+  rocksdb::WriteOptions default_write_opts_ = rocksdb::WriteOptions();
 
   rocksdb::Status writeToDB(const rocksdb::WriteOptions &options, 
rocksdb::WriteBatch *updates);
   void recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_family, 
const rocksdb::Status &s);
diff --git a/src/types/redis_bitmap.cc b/src/types/redis_bitmap.cc
index 3651f910..9a08c1fe 100644
--- a/src/types/redis_bitmap.cc
+++ b/src/types/redis_bitmap.cc
@@ -157,6 +157,8 @@ rocksdb::Status Bitmap::GetString(const Slice &user_key, 
const uint32_t max_btos
 
   rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
   read_options.snapshot = ss.GetSnapShot();
+  Slice prefix_key_slice(prefix_key);
+  read_options.iterate_lower_bound = &prefix_key_slice;
 
   auto iter = util::UniqueIterator(storage_, read_options);
   for (iter->Seek(prefix_key); iter->Valid() && 
iter->key().starts_with(prefix_key); iter->Next()) {

Reply via email to