PragmaTwice commented on code in PR #2954:
URL: https://github.com/apache/kvrocks/pull/2954#discussion_r2106753542
##########
src/server/server.cc:
##########
@@ -2126,3 +2127,149 @@ AuthResult Server::AuthenticateUser(const std::string
&user_password, std::strin
*ns = kDefaultNamespace;
return AuthResult::IS_ADMIN;
}
+
+Status Server::GetSlotStats(const std::vector<SlotRange> &slot_ranges,
std::vector<std::string> *v_stats) {
+ std::lock_guard<std::mutex> lg(db_job_mu_);
+ std::bitset<HASH_SLOTS_SIZE> checked_slots;
+ uint64_t total_keys = 0;
+ uint64_t total_unexpected_keys = 0;
+ for (auto slot_range : slot_ranges) {
+ for (int slot = slot_range.start; slot <= slot_range.end; slot++) {
+ if (checked_slots.test(slot)) {
+ continue;
+ } else {
+ checked_slots.set(slot);
+ }
+
+ if (slot_scan_infos_.slot_stats.find(slot) ==
slot_scan_infos_.slot_stats.end()) {
+ v_stats->emplace_back(fmt::format("slot: {}, keys: {}, unexpected
keys: {}", slot, 0, 0));
+ } else {
+ SlotStats ss = slot_scan_infos_.slot_stats[slot];
+ v_stats->emplace_back(
+ fmt::format("slot: {}, keys: {}, unexpected keys: {}", slot,
ss.n_key, ss.n_unexpected_key));
+ total_keys += ss.n_key;
+ total_unexpected_keys += ss.n_unexpected_key;
+ }
+ }
+ }
+ v_stats->emplace_back(fmt::format("total keys: {}, total unexpected keys:
{}", total_keys, total_unexpected_keys));
+ return Status::OK();
+}
+
+Status Server::AsyncScanSlots(const std::vector<SlotRange> &slot_ranges) {
+ std::lock_guard<std::mutex> lg(db_job_mu_);
+ if (slot_scan_infos_.is_scanning) {
+ return {Status::NotOK, fmt::format("scanning the slot {} now",
slot_scan_infos_.scanning_slot_id)};
+ }
+ slot_scan_infos_.is_scanning = true;
+
+ return task_runner_.TryPublish([slot_ranges, this] {
+ rocksdb::ReadOptions read_options = storage->DefaultScanOptions();
+ auto snapshot = storage->GetDB()->GetSnapshot();
+ if (!snapshot) {
+ error("[slotsize] Get DB Snapshot error");
+ return;
+ }
+ read_options.snapshot = snapshot;
+ auto no_txn_ctx = engine::Context::NoTransactionContext(storage);
+ bool is_slot_id_encoded = storage->IsSlotIdEncoded();
+ std::bitset<HASH_SLOTS_SIZE> checked_slots;
+ std::vector<SlotStats> slot_stats;
+ for (auto slot_range : slot_ranges) {
+ for (int slot = slot_range.start; slot <= slot_range.end; slot++) {
+ if (checked_slots.test(slot)) {
+ continue;
+ } else {
+ checked_slots.set(slot);
+ slot_scan_infos_.scanning_slot_id = slot;
+ }
+
+ uint64_t start_ts = util::GetTimeStampMS();
+ auto prefix = ComposeSlotKeyPrefix(kDefaultNamespace, slot);
+ auto upper_bound = ComposeSlotKeyUpperBound(kDefaultNamespace, slot);
+ rocksdb::Slice prefix_slice(prefix);
+ rocksdb::Slice upper_bound_slice(upper_bound);
+ read_options.iterate_lower_bound = &prefix_slice;
+ read_options.iterate_upper_bound = &upper_bound_slice;
+ engine::DBIterator iter(no_txn_ctx, read_options);
+ uint64_t n_keys = 0;
+ uint64_t unexpected_keys = 0;
+ for (iter.Seek(prefix); iter.Valid(); iter.Next()) {
+ auto key_slot_id = ExtractSlotId(iter.Key());
+ auto [_, user_key] = ExtractNamespaceKey<std::string>(iter.Key(),
is_slot_id_encoded);
+ if (slot != key_slot_id) {
+ unexpected_keys++;
+ } else {
+ n_keys++;
+ }
+ }
+ if (unexpected_keys > 0) {
+ error("[slotsize] Slot {} has {} unexpected key(s)", slot,
unexpected_keys);
+ }
+ slot_stats.emplace_back(SlotStats{static_cast<uint16_t>(slot), n_keys,
unexpected_keys});
+ auto elapsed = util::GetTimeStampMS() - start_ts;
+ info("[slotsize] Succeed to scan slot: {}, elapsed: {} ms, keys: {},
unexpected keys: {}", slot, elapsed,
+ n_keys, unexpected_keys);
+ }
+ }
+ storage->GetDB()->ReleaseSnapshot(snapshot);
+
+ std::lock_guard<std::mutex> lg(db_job_mu_);
+ for (SlotStats ss : slot_stats) {
+ slot_scan_infos_.slot_stats[ss.slot_id] = ss;
+ }
+ slot_scan_infos_.last_scan_time_secs = util::GetTimeStamp();
+ slot_scan_infos_.is_scanning = false;
+ slot_scan_infos_.scanning_slot_id = -1;
+ });
+}
+
+Status Server::DumpSlotKeys(const std::vector<SlotRange> &slot_ranges) const {
+ // only supports one slot a time
+ int dump_slot = slot_ranges[0].start;
+ for (auto slot_range : slot_ranges) {
+ for (int slot = slot_range.start; slot <= slot_range.end; slot++) {
+ if (slot != dump_slot) {
+ return {Status::NotOK, "only supports one slot a time"};
+ }
+ }
+ }
Review Comment:
If then, we can just accept an integer instead of a vector of slot ranges.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]