PragmaTwice commented on code in PR #2954:
URL: https://github.com/apache/kvrocks/pull/2954#discussion_r2106750100


##########
src/server/server.cc:
##########
@@ -2126,3 +2127,149 @@ AuthResult Server::AuthenticateUser(const std::string 
&user_password, std::strin
   *ns = kDefaultNamespace;
   return AuthResult::IS_ADMIN;
 }
+
+Status Server::GetSlotStats(const std::vector<SlotRange> &slot_ranges, 
std::vector<std::string> *v_stats) {
+  std::lock_guard<std::mutex> lg(db_job_mu_);
+  std::bitset<HASH_SLOTS_SIZE> checked_slots;
+  uint64_t total_keys = 0;
+  uint64_t total_unexpected_keys = 0;
+  for (auto slot_range : slot_ranges) {
+    for (int slot = slot_range.start; slot <= slot_range.end; slot++) {
+      if (checked_slots.test(slot)) {
+        continue;
+      } else {
+        checked_slots.set(slot);
+      }
+
+      if (slot_scan_infos_.slot_stats.find(slot) == 
slot_scan_infos_.slot_stats.end()) {
+        v_stats->emplace_back(fmt::format("slot: {}, keys: {}, unexpected 
keys: {}", slot, 0, 0));
+      } else {
+        SlotStats ss = slot_scan_infos_.slot_stats[slot];
+        v_stats->emplace_back(
+            fmt::format("slot: {}, keys: {}, unexpected keys: {}", slot, 
ss.n_key, ss.n_unexpected_key));
+        total_keys += ss.n_key;
+        total_unexpected_keys += ss.n_unexpected_key;
+      }
+    }
+  }
+  v_stats->emplace_back(fmt::format("total keys: {}, total unexpected keys: 
{}", total_keys, total_unexpected_keys));
+  return Status::OK();
+}
+
+Status Server::AsyncScanSlots(const std::vector<SlotRange> &slot_ranges) {
+  std::lock_guard<std::mutex> lg(db_job_mu_);
+  if (slot_scan_infos_.is_scanning) {
+    return {Status::NotOK, fmt::format("scanning the slot {} now", 
slot_scan_infos_.scanning_slot_id)};
+  }
+  slot_scan_infos_.is_scanning = true;
+
+  return task_runner_.TryPublish([slot_ranges, this] {
+    rocksdb::ReadOptions read_options = storage->DefaultScanOptions();
+    auto snapshot = storage->GetDB()->GetSnapshot();
+    if (!snapshot) {
+      error("[slotsize] Get DB Snapshot error");
+      return;
+    }
+    read_options.snapshot = snapshot;
+    auto no_txn_ctx = engine::Context::NoTransactionContext(storage);
+    bool is_slot_id_encoded = storage->IsSlotIdEncoded();
+    std::bitset<HASH_SLOTS_SIZE> checked_slots;
+    std::vector<SlotStats> slot_stats;
+    for (auto slot_range : slot_ranges) {
+      for (int slot = slot_range.start; slot <= slot_range.end; slot++) {
+        if (checked_slots.test(slot)) {
+          continue;
+        } else {
+          checked_slots.set(slot);
+          slot_scan_infos_.scanning_slot_id = slot;
+        }
+
+        uint64_t start_ts = util::GetTimeStampMS();
+        auto prefix = ComposeSlotKeyPrefix(kDefaultNamespace, slot);
+        auto upper_bound = ComposeSlotKeyUpperBound(kDefaultNamespace, slot);
+        rocksdb::Slice prefix_slice(prefix);
+        rocksdb::Slice upper_bound_slice(upper_bound);
+        read_options.iterate_lower_bound = &prefix_slice;
+        read_options.iterate_upper_bound = &upper_bound_slice;
+        engine::DBIterator iter(no_txn_ctx, read_options);
+        uint64_t n_keys = 0;
+        uint64_t unexpected_keys = 0;
+        for (iter.Seek(prefix); iter.Valid(); iter.Next()) {
+          auto key_slot_id = ExtractSlotId(iter.Key());
+          auto [_, user_key] = ExtractNamespaceKey<std::string>(iter.Key(), 
is_slot_id_encoded);
+          if (slot != key_slot_id) {
+            unexpected_keys++;
+          } else {
+            n_keys++;
+          }
+        }
+        if (unexpected_keys > 0) {
+          error("[slotsize] Slot {} has {} unexpected key(s)", slot, 
unexpected_keys);
+        }
+        slot_stats.emplace_back(SlotStats{static_cast<uint16_t>(slot), n_keys, 
unexpected_keys});
+        auto elapsed = util::GetTimeStampMS() - start_ts;
+        info("[slotsize] Succeed to scan slot: {}, elapsed: {} ms, keys: {}, 
unexpected keys: {}", slot, elapsed,
+             n_keys, unexpected_keys);
+      }
+    }
+    storage->GetDB()->ReleaseSnapshot(snapshot);
+
+    std::lock_guard<std::mutex> lg(db_job_mu_);
+    for (SlotStats ss : slot_stats) {
+      slot_scan_infos_.slot_stats[ss.slot_id] = ss;
+    }
+    slot_scan_infos_.last_scan_time_secs = util::GetTimeStamp();
+    slot_scan_infos_.is_scanning = false;
+    slot_scan_infos_.scanning_slot_id = -1;
+  });
+}
+
+Status Server::DumpSlotKeys(const std::vector<SlotRange> &slot_ranges) const {
+  // only supports one slot a time
+  int dump_slot = slot_ranges[0].start;
+  for (auto slot_range : slot_ranges) {
+    for (int slot = slot_range.start; slot <= slot_range.end; slot++) {
+      if (slot != dump_slot) {
+        return {Status::NotOK, "only supports one slot a time"};
+      }
+    }
+  }
+
+  uint64_t start_ts = util::GetTimeStampMS();
+  spdlog::level::level_enum level = config_->log_level;
+  rocksdb::ReadOptions read_options = storage->DefaultScanOptions();
+  auto snapshot = storage->GetDB()->GetSnapshot();
+  if (!snapshot) {
+    error("[slotsize] Get DB Snapshot error");
+    return {Status::NotOK, "get db snapshot error"};
+  }
+  read_options.snapshot = snapshot;
+  auto no_txn_ctx = engine::Context::NoTransactionContext(storage);
+  bool is_slot_id_encoded = storage->IsSlotIdEncoded();
+  auto prefix = ComposeSlotKeyPrefix(kDefaultNamespace, dump_slot);
+  auto upper_bound = ComposeSlotKeyUpperBound(kDefaultNamespace, dump_slot);
+  rocksdb::Slice prefix_slice(prefix);
+  rocksdb::Slice upper_bound_slice(upper_bound);
+  read_options.iterate_lower_bound = &prefix_slice;
+  read_options.iterate_upper_bound = &upper_bound_slice;
+  engine::DBIterator iter(no_txn_ctx, read_options);
+  uint64_t n_keys = 0;
+  uint64_t unexpected_keys = 0;
+  for (iter.Seek(prefix); iter.Valid(); iter.Next()) {
+    auto key_slot_id = ExtractSlotId(iter.Key());
+    auto [_, user_key] = ExtractNamespaceKey<std::string>(iter.Key(), 
is_slot_id_encoded);
+    if (dump_slot != key_slot_id) {
+      unexpected_keys++;
+      warn("[slotsize] Slot {} has an unexpected key: {}", dump_slot, 
user_key);
+    } else {
+      n_keys++;
+      log(level, "[slotsize] dump slot: {}, type: {}, key: {}", dump_slot, 
RedisTypeNames[iter.Type()], user_key);
+    }
+  }
+  storage->GetDB()->ReleaseSnapshot(snapshot);
+  auto elapsed = util::GetTimeStampMS() - start_ts;
+  info("[slotsize] Succeed to dump slot: {}, elapsed: {} ms, keys: {}, 
unexpected keys: {}", dump_slot, elapsed, n_keys,
+       unexpected_keys);
+
+  return Status::OK();
+}

Review Comment:
   newline at the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to