This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 571b0225 perf(cluster): add upper bound for scan during migration
(#2724)
571b0225 is described below
commit 571b02253b9eff79ff94a7bc9235b84ca183ecd2
Author: Rivers <[email protected]>
AuthorDate: Sun Jan 19 11:40:46 2025 +0800
perf(cluster): add upper bound for scan during migration (#2724)
Co-authored-by: Twice <[email protected]>
Co-authored-by: Twice <[email protected]>
---
src/cluster/slot_migrate.cc | 10 +++++++++-
src/cluster/slot_migrate.h | 5 -----
src/config/config.cc | 5 ++---
src/storage/redis_metadata.cc | 2 ++
src/storage/redis_metadata.h | 1 +
5 files changed, 14 insertions(+), 9 deletions(-)
diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc
index 4b544cf7..4bd7e56c 100644
--- a/src/cluster/slot_migrate.cc
+++ b/src/cluster/slot_migrate.cc
@@ -29,6 +29,7 @@
#include "io_util.h"
#include "storage/batch_extractor.h"
#include "storage/iterator.h"
+#include "storage/redis_metadata.h"
#include "sync_migrate_context.h"
#include "thread_util.h"
#include "time_util.h"
@@ -343,10 +344,13 @@ Status SlotMigrator::sendSnapshotByCmd() {
std::string prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
LOG(INFO) << "[migrate] Iterate keys of slot(s), key's prefix: " << prefix;
+ std::string upper_bound = ComposeSlotKeyUpperBound(namespace_,
slot_range.end);
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = slot_snapshot_;
Slice prefix_slice(prefix);
+ Slice upper_bound_slice(upper_bound);
read_options.iterate_lower_bound = &prefix_slice;
+ read_options.iterate_upper_bound = &upper_bound_slice;
rocksdb::ColumnFamilyHandle *cf_handle =
storage_->GetCFHandle(ColumnFamilyID::Metadata);
auto iter =
util::UniqueIterator(storage_->GetDB()->NewIterator(read_options, cf_handle));
@@ -1267,13 +1271,17 @@ Status SlotMigrator::sendMigrationBatch(BatchSender
*batch) {
Status SlotMigrator::sendSnapshotByRawKV() {
uint64_t start_ts = util::GetTimeStampMS();
auto slot_range = slot_range_.load();
- LOG(INFO) << "[migrate] Migrating snapshot of slot(s) " <<
slot_range.String() << " by raw key value";
+ LOG(INFO) << fmt::format("[migrate] Migrating snapshot of slot(s) {} by raw
key value", slot_range.String());
auto prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
+ auto upper_bound = ComposeSlotKeyUpperBound(namespace_, slot_range.end);
+
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = slot_snapshot_;
rocksdb::Slice prefix_slice(prefix);
+ rocksdb::Slice upper_bound_slice(upper_bound);
read_options.iterate_lower_bound = &prefix_slice;
+ read_options.iterate_upper_bound = &upper_bound_slice;
auto no_txn_ctx = engine::Context::NoTransactionContext(storage_);
engine::DBIterator iter(no_txn_ctx, read_options);
diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h
index 6d323b6b..8b5cf493 100644
--- a/src/cluster/slot_migrate.h
+++ b/src/cluster/slot_migrate.h
@@ -26,8 +26,6 @@
#include <rocksdb/transaction_log.h>
#include <rocksdb/write_batch.h>
-#include <chrono>
-#include <map>
#include <memory>
#include <string>
#include <thread>
@@ -35,10 +33,7 @@
#include <vector>
#include "batch_sender.h"
-#include "encoding.h"
-#include "parse_util.h"
#include "server/server.h"
-#include "slot_import.h"
#include "status.h"
#include "storage/redis_db.h"
#include "unique_fd.h"
diff --git a/src/config/config.cc b/src/config/config.cc
index 9d8e2a79..fe7e3fb0 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -24,13 +24,12 @@
#include <rocksdb/env.h>
#include <strings.h>
-#include <algorithm>
+#include <cstddef>
#include <cstdint>
#include <cstring>
#include <fstream>
#include <iostream>
#include <iterator>
-#include <limits>
#include <string>
#include <utility>
#include <vector>
@@ -125,7 +124,7 @@ Status SetRocksdbCompression(Server *srv, const
rocksdb::CompressionType compres
std::vector<std::string> compression_per_level_builder;
compression_per_level_builder.reserve(KVROCKS_MAX_LSM_LEVEL);
- for (int i = 0; i < compression_start_level; i++) {
+ for (size_t i = 0; i < compression_start_level; i++) {
compression_per_level_builder.emplace_back("kNoCompression");
}
for (size_t i = compression_start_level; i < KVROCKS_MAX_LSM_LEVEL; i++) {
diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc
index 76403faa..9df10532 100644
--- a/src/storage/redis_metadata.cc
+++ b/src/storage/redis_metadata.cc
@@ -159,6 +159,8 @@ std::string ComposeSlotKeyPrefix(const Slice &ns, int
slotid) {
return output;
}
+std::string ComposeSlotKeyUpperBound(const Slice &ns, int slotid) { return
ComposeSlotKeyPrefix(ns, slotid + 1); }
+
Metadata::Metadata(RedisType type, bool generate_version, bool
use_64bit_common_field)
: flags((use_64bit_common_field ? METADATA_64BIT_ENCODING_MASK : 0) |
(METADATA_TYPE_MASK & type)),
expire(0),
diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h
index 5590609b..cba9df80 100644
--- a/src/storage/redis_metadata.h
+++ b/src/storage/redis_metadata.h
@@ -112,6 +112,7 @@ template <typename T = Slice>
[[nodiscard]] std::tuple<T, T> ExtractNamespaceKey(Slice ns_key, bool
slot_id_encoded);
[[nodiscard]] std::string ComposeNamespaceKey(const Slice &ns, const Slice
&key, bool slot_id_encoded);
[[nodiscard]] std::string ComposeSlotKeyPrefix(const Slice &ns, int slotid);
+[[nodiscard]] std::string ComposeSlotKeyUpperBound(const Slice &ns, int
slotid);
class InternalKey {
public: