This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 571b0225 perf(cluster): add upper bound for scan during migration 
(#2724)
571b0225 is described below

commit 571b02253b9eff79ff94a7bc9235b84ca183ecd2
Author: Rivers <[email protected]>
AuthorDate: Sun Jan 19 11:40:46 2025 +0800

    perf(cluster): add upper bound for scan during migration (#2724)
    
    Co-authored-by: Twice <[email protected]>
    Co-authored-by: Twice <[email protected]>
---
 src/cluster/slot_migrate.cc   | 10 +++++++++-
 src/cluster/slot_migrate.h    |  5 -----
 src/config/config.cc          |  5 ++---
 src/storage/redis_metadata.cc |  2 ++
 src/storage/redis_metadata.h  |  1 +
 5 files changed, 14 insertions(+), 9 deletions(-)

diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc
index 4b544cf7..4bd7e56c 100644
--- a/src/cluster/slot_migrate.cc
+++ b/src/cluster/slot_migrate.cc
@@ -29,6 +29,7 @@
 #include "io_util.h"
 #include "storage/batch_extractor.h"
 #include "storage/iterator.h"
+#include "storage/redis_metadata.h"
 #include "sync_migrate_context.h"
 #include "thread_util.h"
 #include "time_util.h"
@@ -343,10 +344,13 @@ Status SlotMigrator::sendSnapshotByCmd() {
   std::string prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
   LOG(INFO) << "[migrate] Iterate keys of slot(s), key's prefix: " << prefix;
 
+  std::string upper_bound = ComposeSlotKeyUpperBound(namespace_, 
slot_range.end);
   rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
   read_options.snapshot = slot_snapshot_;
   Slice prefix_slice(prefix);
+  Slice upper_bound_slice(upper_bound);
   read_options.iterate_lower_bound = &prefix_slice;
+  read_options.iterate_upper_bound = &upper_bound_slice;
   rocksdb::ColumnFamilyHandle *cf_handle = 
storage_->GetCFHandle(ColumnFamilyID::Metadata);
   auto iter = 
util::UniqueIterator(storage_->GetDB()->NewIterator(read_options, cf_handle));
 
@@ -1267,13 +1271,17 @@ Status SlotMigrator::sendMigrationBatch(BatchSender 
*batch) {
 Status SlotMigrator::sendSnapshotByRawKV() {
   uint64_t start_ts = util::GetTimeStampMS();
   auto slot_range = slot_range_.load();
-  LOG(INFO) << "[migrate] Migrating snapshot of slot(s) " << 
slot_range.String() << " by raw key value";
+  LOG(INFO) << fmt::format("[migrate] Migrating snapshot of slot(s) {} by raw 
key value", slot_range.String());
 
   auto prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
+  auto upper_bound = ComposeSlotKeyUpperBound(namespace_, slot_range.end);
+
   rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
   read_options.snapshot = slot_snapshot_;
   rocksdb::Slice prefix_slice(prefix);
+  rocksdb::Slice upper_bound_slice(upper_bound);
   read_options.iterate_lower_bound = &prefix_slice;
+  read_options.iterate_upper_bound = &upper_bound_slice;
   auto no_txn_ctx = engine::Context::NoTransactionContext(storage_);
   engine::DBIterator iter(no_txn_ctx, read_options);
 
diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h
index 6d323b6b..8b5cf493 100644
--- a/src/cluster/slot_migrate.h
+++ b/src/cluster/slot_migrate.h
@@ -26,8 +26,6 @@
 #include <rocksdb/transaction_log.h>
 #include <rocksdb/write_batch.h>
 
-#include <chrono>
-#include <map>
 #include <memory>
 #include <string>
 #include <thread>
@@ -35,10 +33,7 @@
 #include <vector>
 
 #include "batch_sender.h"
-#include "encoding.h"
-#include "parse_util.h"
 #include "server/server.h"
-#include "slot_import.h"
 #include "status.h"
 #include "storage/redis_db.h"
 #include "unique_fd.h"
diff --git a/src/config/config.cc b/src/config/config.cc
index 9d8e2a79..fe7e3fb0 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -24,13 +24,12 @@
 #include <rocksdb/env.h>
 #include <strings.h>
 
-#include <algorithm>
+#include <cstddef>
 #include <cstdint>
 #include <cstring>
 #include <fstream>
 #include <iostream>
 #include <iterator>
-#include <limits>
 #include <string>
 #include <utility>
 #include <vector>
@@ -125,7 +124,7 @@ Status SetRocksdbCompression(Server *srv, const 
rocksdb::CompressionType compres
   std::vector<std::string> compression_per_level_builder;
   compression_per_level_builder.reserve(KVROCKS_MAX_LSM_LEVEL);
 
-  for (int i = 0; i < compression_start_level; i++) {
+  for (size_t i = 0; i < compression_start_level; i++) {
     compression_per_level_builder.emplace_back("kNoCompression");
   }
   for (size_t i = compression_start_level; i < KVROCKS_MAX_LSM_LEVEL; i++) {
diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc
index 76403faa..9df10532 100644
--- a/src/storage/redis_metadata.cc
+++ b/src/storage/redis_metadata.cc
@@ -159,6 +159,8 @@ std::string ComposeSlotKeyPrefix(const Slice &ns, int 
slotid) {
   return output;
 }
 
+std::string ComposeSlotKeyUpperBound(const Slice &ns, int slotid) { return 
ComposeSlotKeyPrefix(ns, slotid + 1); }
+
 Metadata::Metadata(RedisType type, bool generate_version, bool 
use_64bit_common_field)
     : flags((use_64bit_common_field ? METADATA_64BIT_ENCODING_MASK : 0) | 
(METADATA_TYPE_MASK & type)),
       expire(0),
diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h
index 5590609b..cba9df80 100644
--- a/src/storage/redis_metadata.h
+++ b/src/storage/redis_metadata.h
@@ -112,6 +112,7 @@ template <typename T = Slice>
 [[nodiscard]] std::tuple<T, T> ExtractNamespaceKey(Slice ns_key, bool 
slot_id_encoded);
 [[nodiscard]] std::string ComposeNamespaceKey(const Slice &ns, const Slice 
&key, bool slot_id_encoded);
 [[nodiscard]] std::string ComposeSlotKeyPrefix(const Slice &ns, int slotid);
+[[nodiscard]] std::string ComposeSlotKeyUpperBound(const Slice &ns, int 
slotid);
 
 class InternalKey {
  public:

Reply via email to