This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new beb1979f feat(cluster): support migrate slot range (#2389)
beb1979f is described below
commit beb1979f14c5af2ca3b4f7a2f42bd0ce857ab85a
Author: Zhou SiLe <[email protected]>
AuthorDate: Sun Jul 14 21:19:21 2024 +0800
feat(cluster): support migrate slot range (#2389)
---
src/cluster/cluster.cc | 105 +++++++-----
src/cluster/cluster.h | 11 +-
src/cluster/cluster_defs.h | 35 +++-
src/cluster/slot_import.cc | 49 +++---
src/cluster/slot_import.h | 11 +-
src/cluster/slot_migrate.cc | 150 +++++++++--------
src/cluster/slot_migrate.h | 28 ++--
src/commands/cmd_cluster.cc | 20 ++-
src/storage/batch_extractor.cc | 12 +-
src/storage/batch_extractor.h | 9 +-
src/storage/iterator.cc | 8 +-
src/storage/iterator.h | 12 +-
src/storage/redis_db.cc | 12 +-
src/storage/redis_db.h | 3 +-
tests/gocase/integration/cluster/cluster_test.go | 4 +-
.../integration/slotimport/slotimport_test.go | 14 +-
.../integration/slotmigrate/slotmigrate_test.go | 178 +++++++++++++++++++--
tests/gocase/unit/geo/geo_test.go | 27 ++--
tests/gocase/unit/type/list/list_test.go | 3 -
tests/gocase/unit/type/zset/zset_test.go | 2 +-
tests/gocase/util/random.go | 40 +++++
21 files changed, 512 insertions(+), 221 deletions(-)
diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index 8d373e0d..a38c0868 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -115,7 +115,7 @@ Status Cluster::SetSlotRanges(const std::vector<SlotRange>
&slot_ranges, const s
// 2. Add the slot into to-assign node
// 3. Update the map of slots to nodes.
// remember: The atomicity of the process is based on
- // the transactionality of ClearKeysOfSlot().
+ // the transactionality of ClearKeysOfSlotRange().
for (auto [s_start, s_end] : slot_ranges) {
for (int slot = s_start; slot <= s_end; slot++) {
std::shared_ptr<ClusterNode> old_node = slots_nodes_[slot];
@@ -129,7 +129,7 @@ Status Cluster::SetSlotRanges(const std::vector<SlotRange>
&slot_ranges, const s
if (old_node == myself_ && old_node != to_assign_node) {
// If slot is migrated from this node
if (migrated_slots_.count(slot) > 0) {
- auto s = srv_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace,
slot);
+ auto s =
srv_->slot_migrator->ClearKeysOfSlotRange(kDefaultNamespace,
SlotRange::GetPoint(slot));
if (!s.ok()) {
LOG(ERROR) << "failed to clear data of migrated slot: " <<
s.ToString();
}
@@ -214,7 +214,7 @@ Status Cluster::SetClusterNodes(const std::string
&nodes_str, int64_t version, b
if (!migrated_slots_.empty()) {
for (const auto &[slot, _] : migrated_slots_) {
if (slots_nodes_[slot] != myself_) {
- auto s = srv_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace, slot);
+ auto s = srv_->slot_migrator->ClearKeysOfSlotRange(kDefaultNamespace,
SlotRange::GetPoint(slot));
if (!s.ok()) {
LOG(ERROR) << "failed to clear data of migrated slots: " <<
s.ToString();
}
@@ -258,41 +258,53 @@ Status Cluster::SetMasterSlaveRepl() {
bool Cluster::IsNotMaster() { return myself_ == nullptr || myself_->role !=
kClusterMaster || srv_->IsSlave(); }
-Status Cluster::SetSlotMigrated(int slot, const std::string &ip_port) {
- if (!IsValidSlot(slot)) {
- return {Status::NotOK, errSlotOutOfRange};
+Status Cluster::SetSlotRangeMigrated(const SlotRange &slot_range, const
std::string &ip_port) {
+ if (!slot_range.IsValid()) {
+ return {Status::NotOK, errSlotRangeInvalid};
}
// It is called by slot-migrating thread which is an asynchronous thread.
// Therefore, it should be locked when a record is added to 'migrated_slots_'
// which will be accessed when executing commands.
auto exclusivity = srv_->WorkExclusivityGuard();
- migrated_slots_[slot] = ip_port;
+ for (auto slot = slot_range.start; slot <= slot_range.end; slot++) {
+ migrated_slots_[slot] = ip_port;
+ }
return Status::OK();
}
-Status Cluster::SetSlotImported(int slot) {
- if (!IsValidSlot(slot)) {
- return {Status::NotOK, errSlotOutOfRange};
+Status Cluster::SetSlotRangeImported(const SlotRange &slot_range) {
+ if (!slot_range.IsValid()) {
+ return {Status::NotOK, errSlotRangeInvalid};
}
// It is called by command 'cluster import'. When executing the command, the
// exclusive lock has been locked. Therefore, it can't be locked again.
- imported_slots_.insert(slot);
+ for (auto slot = slot_range.start; slot <= slot_range.end; slot++) {
+ imported_slots_.insert(slot);
+ }
return Status::OK();
}
-Status Cluster::MigrateSlot(int slot, const std::string &dst_node_id,
SyncMigrateContext *blocking_ctx) {
+Status Cluster::MigrateSlotRange(const SlotRange &slot_range, const
std::string &dst_node_id,
+ SyncMigrateContext *blocking_ctx) {
if (nodes_.find(dst_node_id) == nodes_.end()) {
return {Status::NotOK, "Can't find the destination node id"};
}
- if (!IsValidSlot(slot)) {
- return {Status::NotOK, errSlotOutOfRange};
+ if (!slot_range.IsValid()) {
+ return {Status::NotOK, errSlotRangeInvalid};
+ }
+
+ if (!migrated_slots_.empty() &&
+ slot_range.HasOverlap({migrated_slots_.begin()->first,
migrated_slots_.rbegin()->first})) {
+ return {Status::NotOK, "Can't migrate slot which has been migrated"};
}
- if (slots_nodes_[slot] != myself_) {
- return {Status::NotOK, "Can't migrate slot which doesn't belong to me"};
+ for (auto slot = slot_range.start; slot <= slot_range.end; slot++) {
+ if (slots_nodes_[slot] != myself_) {
+ return {Status::NotOK, "Can't migrate slot which doesn't belong to me"};
+ }
}
if (IsNotMaster()) {
@@ -308,51 +320,55 @@ Status Cluster::MigrateSlot(int slot, const std::string
&dst_node_id, SyncMigrat
}
const auto &dst = nodes_[dst_node_id];
- Status s = srv_->slot_migrator->PerformSlotMigration(dst_node_id, dst->host,
dst->port, slot, blocking_ctx);
+ Status s =
+ srv_->slot_migrator->PerformSlotRangeMigration(dst_node_id, dst->host,
dst->port, slot_range, blocking_ctx);
return s;
}
-Status Cluster::ImportSlot(redis::Connection *conn, int slot, int state) {
+Status Cluster::ImportSlotRange(redis::Connection *conn, const SlotRange
&slot_range, int state) {
if (IsNotMaster()) {
return {Status::NotOK, "Slave can't import slot"};
}
- if (!IsValidSlot(slot)) {
- return {Status::NotOK, errSlotOutOfRange};
+ if (!slot_range.IsValid()) {
+ return {Status::NotOK, errSlotRangeInvalid};
}
- auto source_node = srv_->cluster->slots_nodes_[slot];
- if (source_node && source_node->id == myid_) {
- return {Status::NotOK, "Can't import slot which belongs to me"};
+
+ for (auto slot = slot_range.start; slot <= slot_range.end; slot++) {
+ auto source_node = srv_->cluster->slots_nodes_[slot];
+ if (source_node && source_node->id == myid_) {
+ return {Status::NotOK, "Can't import slot which belongs to me"};
+ }
}
Status s;
switch (state) {
case kImportStart:
- s = srv_->slot_import->Start(slot);
+ s = srv_->slot_import->Start(slot_range);
if (!s.IsOK()) return s;
// Set link importing
conn->SetImporting();
- myself_->importing_slot = slot;
+ myself_->importing_slot_range = slot_range;
// Set link error callback
- conn->close_cb = [object_ptr = srv_->slot_import.get(), slot](int fd) {
+ conn->close_cb = [object_ptr = srv_->slot_import.get(), slot_range](int
fd) {
auto s = object_ptr->StopForLinkError();
if (!s.IsOK()) {
- LOG(ERROR) << fmt::format("[import] Failed to stop importing slot
{}: {}", slot, s.Msg());
+ LOG(ERROR) << fmt::format("[import] Failed to stop importing slot(s)
{}: {}", slot_range.String(), s.Msg());
}
}; // Stop forbidding writing slot to accept write commands
- if (slot == srv_->slot_migrator->GetForbiddenSlot())
srv_->slot_migrator->ReleaseForbiddenSlot();
- LOG(INFO) << fmt::format("[import] Start importing slot {}", slot);
+ if (slot_range == srv_->slot_migrator->GetForbiddenSlotRange())
srv_->slot_migrator->ReleaseForbiddenSlotRange();
+ LOG(INFO) << fmt::format("[import] Start importing slot(s) {}",
slot_range.String());
break;
case kImportSuccess:
- s = srv_->slot_import->Success(slot);
+ s = srv_->slot_import->Success(slot_range);
if (!s.IsOK()) return s;
- LOG(INFO) << fmt::format("[import] Mark the importing slot {} as
succeed", slot);
+ LOG(INFO) << fmt::format("[import] Mark the importing slot(s) {} as
succeed", slot_range.String());
break;
case kImportFailed:
- s = srv_->slot_import->Fail(slot);
+ s = srv_->slot_import->Fail(slot_range);
if (!s.IsOK()) return s;
- LOG(INFO) << fmt::format("[import] Mark the importing slot {} as
failed", slot);
+ LOG(INFO) << fmt::format("[import] Mark the importing slot(s) {} as
failed", slot_range.String());
break;
default:
return {Status::NotOK, errInvalidImportState};
@@ -550,15 +566,16 @@ std::string Cluster::genNodesDescription() {
// Just for MYSELF node to show the importing/migrating slot
if (node->id == myid_) {
if (srv_->slot_migrator) {
- auto migrating_slot = srv_->slot_migrator->GetMigratingSlot();
- if (migrating_slot != -1) {
- node_str.append(fmt::format(" [{}->-{}]", migrating_slot,
srv_->slot_migrator->GetDstNode()));
+ auto migrating_slot_range =
srv_->slot_migrator->GetMigratingSlotRange();
+ if (migrating_slot_range.IsValid()) {
+ node_str.append(fmt::format(" [{}->-{}]",
migrating_slot_range.String(), srv_->slot_migrator->GetDstNode()));
}
}
if (srv_->slot_import) {
- auto importing_slot = srv_->slot_import->GetSlot();
- if (importing_slot != -1) {
- node_str.append(fmt::format(" [{}-<-{}]", importing_slot,
getNodeIDBySlot(importing_slot)));
+ auto importing_slot_range = srv_->slot_import->GetSlotRange();
+ if (importing_slot_range.IsValid()) {
+ node_str.append(
+ fmt::format(" [{}-<-{}]", importing_slot_range.String(),
getNodeIDBySlot(importing_slot_range.start)));
}
}
}
@@ -802,7 +819,9 @@ Status Cluster::parseClusterNodes(const std::string
&nodes_str, ClusterNodes *no
return Status::OK();
}
-bool Cluster::IsWriteForbiddenSlot(int slot) const { return
srv_->slot_migrator->GetForbiddenSlot() == slot; }
+bool Cluster::IsWriteForbiddenSlot(int slot) const {
+ return srv_->slot_migrator->GetForbiddenSlotRange().Contains(slot);
+}
Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes,
const std::vector<std::string> &cmd_tokens,
redis::Connection *conn) {
@@ -846,7 +865,7 @@ Status Cluster::CanExecByMySelf(const
redis::CommandAttributes *attributes, cons
return Status::OK(); // I'm serving this slot
}
- if (myself_ && myself_->importing_slot == slot &&
+ if (myself_ && myself_->importing_slot_range.Contains(slot) &&
(conn->IsImporting() ||
conn->IsFlagEnabled(redis::Connection::kAsking))) {
// While data migrating, the topology of the destination node has not been
changed.
// The destination node has to serve the requests from the migrating slot,
@@ -874,10 +893,10 @@ Status Cluster::CanExecByMySelf(const
redis::CommandAttributes *attributes, cons
// Only HARD mode is meaningful to the Kvrocks cluster,
// so it will force clearing all information after resetting.
Status Cluster::Reset() {
- if (srv_->slot_migrator && srv_->slot_migrator->GetMigratingSlot() != -1) {
+ if (srv_->slot_migrator &&
srv_->slot_migrator->GetMigratingSlotRange().IsValid()) {
return {Status::NotOK, "Can't reset cluster while migrating slot"};
}
- if (srv_->slot_import && srv_->slot_import->GetSlot() != -1) {
+ if (srv_->slot_import && srv_->slot_import->GetSlotRange().IsValid()) {
return {Status::NotOK, "Can't reset cluster while importing slot"};
}
if (!srv_->storage->IsEmptyDB()) {
diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h
index 335d5ef1..e595666c 100644
--- a/src/cluster/cluster.h
+++ b/src/cluster/cluster.h
@@ -47,7 +47,7 @@ class ClusterNode {
std::string master_id;
std::bitset<kClusterSlots> slots;
std::vector<std::string> replicas;
- int importing_slot = -1;
+ SlotRange importing_slot_range = {-1, -1};
};
struct SlotInfo {
@@ -74,8 +74,8 @@ class Cluster {
StatusOr<std::string> GetReplicas(const std::string &node_id);
Status SetNodeId(const std::string &node_id);
Status SetSlotRanges(const std::vector<SlotRange> &slot_ranges, const
std::string &node_id, int64_t version);
- Status SetSlotMigrated(int slot, const std::string &ip_port);
- Status SetSlotImported(int slot);
+ Status SetSlotRangeMigrated(const SlotRange &slot_range, const std::string
&ip_port);
+ Status SetSlotRangeImported(const SlotRange &slot_range);
Status GetSlotsInfo(std::vector<SlotInfo> *slot_infos);
Status GetClusterInfo(std::string *cluster_infos);
int64_t GetVersion() const { return version_; }
@@ -85,8 +85,9 @@ class Cluster {
Status CanExecByMySelf(const redis::CommandAttributes *attributes, const
std::vector<std::string> &cmd_tokens,
redis::Connection *conn);
Status SetMasterSlaveRepl();
- Status MigrateSlot(int slot, const std::string &dst_node_id,
SyncMigrateContext *blocking_ctx = nullptr);
- Status ImportSlot(redis::Connection *conn, int slot, int state);
+ Status MigrateSlotRange(const SlotRange &slot_range, const std::string
&dst_node_id,
+ SyncMigrateContext *blocking_ctx = nullptr);
+ Status ImportSlotRange(redis::Connection *conn, const SlotRange &slot_range,
int state);
std::string GetMyId() const { return myid_; }
Status DumpClusterNodes(const std::string &file);
Status LoadClusterNodes(const std::string &file_path);
diff --git a/src/cluster/cluster_defs.h b/src/cluster/cluster_defs.h
index 0178ac75..12fa568d 100644
--- a/src/cluster/cluster_defs.h
+++ b/src/cluster/cluster_defs.h
@@ -21,6 +21,7 @@
#pragma once
#include "cluster/redis_slot.h"
+#include "fmt/format.h"
enum {
kClusterMaster = 1,
@@ -33,6 +34,7 @@ enum {
inline constexpr const char *errInvalidNodeID = "Invalid cluster node id";
inline constexpr const char *errInvalidSlotID = "Invalid slot id";
inline constexpr const char *errSlotOutOfRange = "Slot is out of range";
+inline constexpr const char *errSlotRangeInvalid = "Slot range is invalid";
inline constexpr const char *errInvalidClusterVersion = "Invalid cluster
version";
inline constexpr const char *errSlotOverlapped = "Slot distribution is
overlapped";
inline constexpr const char *errNoMasterNode = "The node isn't a master";
@@ -40,4 +42,35 @@ inline constexpr const char *errClusterNoInitialized = "The
cluster is not initi
inline constexpr const char *errInvalidClusterNodeInfo = "Invalid cluster
nodes info";
inline constexpr const char *errInvalidImportState = "Invalid import state";
-using SlotRange = std::pair<int, int>;
+/// SlotRange is a range of cluster slots covering [start, end],
+/// where the valid values for start and end are [0, kClusterSlots).
+/// When both start and end are -1, it usually indicates an empty or wrong
SlotRange.
+struct SlotRange {
+ SlotRange(int start, int end) : start(start), end(end) {}
+ SlotRange() : start(-1), end(-1) {}
+ bool IsValid() const { return start >= 0 && end >= 0 && start <= end && end
< kClusterSlots; }
+
+ /// Contains is used to determine whether a slot is within the SlotRange.
+ /// Note that if the SlotRange is invalid, it will always return False.
+ bool Contains(int slot) const { return IsValid() && slot >= start && slot <=
end; }
+
+ /// HasOverlap is used to determine whether two SlotRanges overlap.
+ /// Note that if either SlotRange is invalid, it will always return False.
+ bool HasOverlap(const SlotRange &rhs) const {
+ return IsValid() && rhs.IsValid() && end >= rhs.start && rhs.end >= start;
+ }
+
+ std::string String() const {
+ if (!IsValid()) return "-1";
+ if (start == end) return fmt::format("{}", start);
+ return fmt::format("{}-{}", start, end);
+ }
+
+ static SlotRange GetPoint(int slot) { return {slot, slot}; }
+
+ bool operator==(const SlotRange &rhs) const { return start == rhs.start &&
end == rhs.end; }
+ bool operator!=(const SlotRange &rhs) const { return !(*this == rhs); }
+
+ int start;
+ int end;
+};
diff --git a/src/cluster/slot_import.cc b/src/cluster/slot_import.cc
index 4306e336..ef04aba7 100644
--- a/src/cluster/slot_import.cc
+++ b/src/cluster/slot_import.cc
@@ -21,59 +21,62 @@
#include "slot_import.h"
SlotImport::SlotImport(Server *srv)
- : Database(srv->storage, kDefaultNamespace), srv_(srv), import_slot_(-1),
import_status_(kImportNone) {
+ : Database(srv->storage, kDefaultNamespace), srv_(srv),
import_slot_range_(-1, -1), import_status_(kImportNone) {
std::lock_guard<std::mutex> guard(mutex_);
// Let metadata_cf_handle_ be nullptr, then get them in real time while use
them.
// See comments in SlotMigrator::SlotMigrator for detailed reason.
metadata_cf_handle_ = nullptr;
}
-Status SlotImport::Start(int slot) {
+Status SlotImport::Start(const SlotRange &slot_range) {
std::lock_guard<std::mutex> guard(mutex_);
if (import_status_ == kImportStart) {
// return ok if the same slot is importing
- if (import_slot_ == slot) {
+ if (import_slot_range_ == slot_range) {
return Status::OK();
}
- return {Status::NotOK, fmt::format("only one importing slot is allowed,
current slot is: {}", import_slot_)};
+ return {Status::NotOK,
+ fmt::format("only one importing job is allowed, current importing:
{}", import_slot_range_.String())};
}
// Clean slot data first
- auto s = ClearKeysOfSlot(namespace_, slot);
+ auto s = ClearKeysOfSlotRange(namespace_, slot_range);
if (!s.ok()) {
- return {Status::NotOK, fmt::format("clear keys of slot error: {}",
s.ToString())};
+ return {Status::NotOK, fmt::format("clear keys of slot(s) error: {}",
s.ToString())};
}
import_status_ = kImportStart;
- import_slot_ = slot;
+ import_slot_range_ = slot_range;
return Status::OK();
}
-Status SlotImport::Success(int slot) {
+Status SlotImport::Success(const SlotRange &slot_range) {
std::lock_guard<std::mutex> guard(mutex_);
- if (import_slot_ != slot) {
- return {Status::NotOK, fmt::format("mismatch slot, importing slot: {}, but
got: {}", import_slot_, slot)};
+ if (import_slot_range_ != slot_range) {
+ return {Status::NotOK, fmt::format("mismatch slot, importing slot(s): {},
but got: {}", import_slot_range_.String(),
+ slot_range.String())};
}
- Status s = srv_->cluster->SetSlotImported(import_slot_);
+ Status s = srv_->cluster->SetSlotRangeImported(import_slot_range_);
if (!s.IsOK()) {
- return {Status::NotOK, fmt::format("unable to set imported status: {}",
slot)};
+ return {Status::NotOK, fmt::format("unable to set imported status: {}",
slot_range.String())};
}
import_status_ = kImportSuccess;
return Status::OK();
}
-Status SlotImport::Fail(int slot) {
+Status SlotImport::Fail(const SlotRange &slot_range) {
std::lock_guard<std::mutex> guard(mutex_);
- if (import_slot_ != slot) {
- return {Status::NotOK, fmt::format("mismatch slot, importing slot: {}, but
got: {}", import_slot_, slot)};
+ if (import_slot_range_ != slot_range) {
+ return {Status::NotOK, fmt::format("mismatch slot, importing slot(s): {},
but got: {}", import_slot_range_.String(),
+ slot_range.String())};
}
// Clean imported slot data
- auto s = ClearKeysOfSlot(namespace_, slot);
+ auto s = ClearKeysOfSlotRange(namespace_, slot_range);
if (!s.ok()) {
- return {Status::NotOK, fmt::format("clear keys of slot error: {}",
s.ToString())};
+ return {Status::NotOK, fmt::format("clear keys of slot(s) error: {}",
s.ToString())};
}
import_status_ = kImportFailed;
@@ -96,7 +99,7 @@ Status SlotImport::StopForLinkError() {
// from new master.
if (!srv_->IsSlave()) {
// Clean imported slot data
- auto s = ClearKeysOfSlot(namespace_, import_slot_);
+ auto s = ClearKeysOfSlotRange(namespace_, import_slot_range_);
if (!s.ok()) {
return {Status::NotOK, fmt::format("clear keys of slot error: {}",
s.ToString())};
}
@@ -106,13 +109,13 @@ Status SlotImport::StopForLinkError() {
return Status::OK();
}
-int SlotImport::GetSlot() {
+SlotRange SlotImport::GetSlotRange() {
std::lock_guard<std::mutex> guard(mutex_);
// import_slot_ only be set when import_status_ is kImportStart
if (import_status_ != kImportStart) {
- return -1;
+ return {-1, -1};
}
- return import_slot_;
+ return import_slot_range_;
}
int SlotImport::GetStatus() {
@@ -123,7 +126,7 @@ int SlotImport::GetStatus() {
void SlotImport::GetImportInfo(std::string *info) {
std::lock_guard<std::mutex> guard(mutex_);
info->clear();
- if (import_slot_ < 0) {
+ if (!import_slot_range_.IsValid()) {
return;
}
@@ -145,5 +148,5 @@ void SlotImport::GetImportInfo(std::string *info) {
break;
}
- *info = fmt::format("importing_slot: {}\r\nimport_state: {}\r\n",
import_slot_, import_stat);
+ *info = fmt::format("importing_slot(s): {}\r\nimport_state: {}\r\n",
import_slot_range_.String(), import_stat);
}
diff --git a/src/cluster/slot_import.h b/src/cluster/slot_import.h
index 9f4bb72d..e3a5d32c 100644
--- a/src/cluster/slot_import.h
+++ b/src/cluster/slot_import.h
@@ -26,6 +26,7 @@
#include <string>
#include <vector>
+#include "cluster_defs.h"
#include "config/config.h"
#include "server/server.h"
#include "storage/redis_db.h"
@@ -42,17 +43,17 @@ class SlotImport : public redis::Database {
explicit SlotImport(Server *srv);
~SlotImport() = default;
- Status Start(int slot);
- Status Success(int slot);
- Status Fail(int slot);
+ Status Start(const SlotRange &slot_range);
+ Status Success(const SlotRange &slot_range);
+ Status Fail(const SlotRange &slot_range);
Status StopForLinkError();
- int GetSlot();
+ SlotRange GetSlotRange();
int GetStatus();
void GetImportInfo(std::string *info);
private:
Server *srv_ = nullptr;
std::mutex mutex_;
- int import_slot_;
+ SlotRange import_slot_range_;
int import_status_;
};
diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc
index 03625dc6..f91c9a1a 100644
--- a/src/cluster/slot_migrate.cc
+++ b/src/cluster/slot_migrate.cc
@@ -74,20 +74,20 @@ SlotMigrator::SlotMigrator(Server *srv)
}
}
-Status SlotMigrator::PerformSlotMigration(const std::string &node_id,
std::string &dst_ip, int dst_port, int slot_id,
- SyncMigrateContext *blocking_ctx) {
+Status SlotMigrator::PerformSlotRangeMigration(const std::string &node_id,
std::string &dst_ip, int dst_port,
+ const SlotRange &slot_range,
SyncMigrateContext *blocking_ctx) {
+ // TODO: concurrent migration, multiple migration jobs
// Only one slot migration job at the same time
- int16_t no_slot = -1;
- if (!migrating_slot_.compare_exchange_strong(no_slot,
static_cast<int16_t>(slot_id))) {
- return {Status::NotOK, "There is already a migrating slot"};
+ SlotRange empty_slot_range = {-1, -1};
+ if (!slot_range_.compare_exchange_strong(empty_slot_range, slot_range)) {
+ return {Status::NotOK, "There is already a migrating job"};
}
- if (forbidden_slot_ == slot_id) {
+ if (slot_range.HasOverlap(forbidden_slot_range_)) {
// Have to release migrate slot set above
- migrating_slot_ = -1;
+ slot_range_ = empty_slot_range;
return {Status::NotOK, "Can't migrate slot which has been migrated"};
}
-
migration_state_ = MigrationState::kStarted;
auto speed = srv_->GetConfig()->migrate_speed;
@@ -115,14 +115,14 @@ Status SlotMigrator::PerformSlotMigration(const
std::string &node_id, std::strin
dst_node_ = node_id;
// Create migration job
- auto job = std::make_unique<SlotMigrationJob>(slot_id, dst_ip, dst_port,
speed, pipeline_size, seq_gap);
+ auto job = std::make_unique<SlotMigrationJob>(slot_range, dst_ip, dst_port,
speed, pipeline_size, seq_gap);
{
std::lock_guard<std::mutex> guard(job_mutex_);
migration_job_ = std::move(job);
job_cv_.notify_one();
}
- LOG(INFO) << "[migrate] Start migrating slot " << slot_id << " to " <<
dst_ip << ":" << dst_port;
+ LOG(INFO) << "[migrate] Start migrating slot(s) " << slot_range.String() <<
" to " << dst_ip << ":" << dst_port;
return Status::OK();
}
@@ -159,8 +159,9 @@ void SlotMigrator::loop() {
return;
}
- LOG(INFO) << "[migrate] Migrating slot: " << migration_job_->slot_id << ",
dst_ip: " << migration_job_->dst_ip
- << ", dst_port: " << migration_job_->dst_port << ", max_speed: "
<< migration_job_->max_speed
+ LOG(INFO) << "[migrate] Migrating slot(s): " <<
migration_job_->slot_range.String()
+ << ", dst_ip: " << migration_job_->dst_ip << ", dst_port: " <<
migration_job_->dst_port
+ << ", max_speed: " << migration_job_->max_speed
<< ", max_pipeline_size: " << migration_job_->max_pipeline_size;
dst_ip_ = migration_job_->dst_ip;
@@ -187,10 +188,11 @@ void SlotMigrator::runMigrationProcess() {
case SlotMigrationStage::kStart: {
auto s = startMigration();
if (s.IsOK()) {
- LOG(INFO) << "[migrate] Succeed to start migrating slot " <<
migrating_slot_;
+ LOG(INFO) << "[migrate] Succeed to start migrating slot(s) " <<
slot_range_.load().String();
current_stage_ = SlotMigrationStage::kSnapshot;
} else {
- LOG(ERROR) << "[migrate] Failed to start migrating slot " <<
migrating_slot_ << ". Error: " << s.Msg();
+ LOG(ERROR) << "[migrate] Failed to start migrating slot(s) " <<
slot_range_.load().String()
+ << ". Error: " << s.Msg();
current_stage_ = SlotMigrationStage::kFailed;
resumeSyncCtx(s);
}
@@ -201,7 +203,8 @@ void SlotMigrator::runMigrationProcess() {
if (s.IsOK()) {
current_stage_ = SlotMigrationStage::kWAL;
} else {
- LOG(ERROR) << "[migrate] Failed to send snapshot of slot " <<
migrating_slot_ << ". Error: " << s.Msg();
+ LOG(ERROR) << "[migrate] Failed to send snapshot of slot(s) " <<
slot_range_.load().String()
+ << ". Error: " << s.Msg();
current_stage_ = SlotMigrationStage::kFailed;
resumeSyncCtx(s);
}
@@ -210,10 +213,11 @@ void SlotMigrator::runMigrationProcess() {
case SlotMigrationStage::kWAL: {
auto s = syncWAL();
if (s.IsOK()) {
- LOG(INFO) << "[migrate] Succeed to sync from WAL for a slot " <<
migrating_slot_;
+ LOG(INFO) << "[migrate] Succeed to sync from WAL for slot(s) " <<
slot_range_.load().String();
current_stage_ = SlotMigrationStage::kSuccess;
} else {
- LOG(ERROR) << "[migrate] Failed to sync from WAL for a slot " <<
migrating_slot_ << ". Error: " << s.Msg();
+ LOG(ERROR) << "[migrate] Failed to sync from WAL for slot(s) " <<
slot_range_.load().String()
+ << ". Error: " << s.Msg();
current_stage_ = SlotMigrationStage::kFailed;
resumeSyncCtx(s);
}
@@ -222,12 +226,12 @@ void SlotMigrator::runMigrationProcess() {
case SlotMigrationStage::kSuccess: {
auto s = finishSuccessfulMigration();
if (s.IsOK()) {
- LOG(INFO) << "[migrate] Succeed to migrate slot " << migrating_slot_;
+ LOG(INFO) << "[migrate] Succeed to migrate slot(s) " <<
slot_range_.load().String();
current_stage_ = SlotMigrationStage::kClean;
migration_state_ = MigrationState::kSuccess;
resumeSyncCtx(s);
} else {
- LOG(ERROR) << "[migrate] Failed to finish a successful migration of
slot " << migrating_slot_
+ LOG(ERROR) << "[migrate] Failed to finish a successful migration of
slot(s) " << slot_range_.load().String()
<< ". Error: " << s.Msg();
current_stage_ = SlotMigrationStage::kFailed;
resumeSyncCtx(s);
@@ -237,10 +241,10 @@ void SlotMigrator::runMigrationProcess() {
case SlotMigrationStage::kFailed: {
auto s = finishFailedMigration();
if (!s.IsOK()) {
- LOG(ERROR) << "[migrate] Failed to finish a failed migration of slot
" << migrating_slot_
+ LOG(ERROR) << "[migrate] Failed to finish a failed migration of
slot(s) " << slot_range_.load().String()
<< ". Error: " << s.Msg();
}
- LOG(INFO) << "[migrate] Failed to migrate a slot" << migrating_slot_;
+ LOG(INFO) << "[migrate] Failed to migrate a slot(s) " <<
slot_range_.load().String();
migration_state_ = MigrationState::kFailed;
current_stage_ = SlotMigrationStage::kClean;
break;
@@ -302,7 +306,8 @@ Status SlotMigrator::startMigration() {
}
}
- LOG(INFO) << "[migrate] Start migrating slot " << migrating_slot_ << ",
connect destination fd " << *dst_fd_;
+ LOG(INFO) << "[migrate] Start migrating slot(s) " <<
slot_range_.load().String() << ", connect destination fd "
+ << *dst_fd_;
return Status::OK();
}
@@ -330,13 +335,13 @@ Status SlotMigrator::sendSnapshotByCmd() {
uint64_t expired_key_cnt = 0;
uint64_t empty_key_cnt = 0;
std::string restore_cmds;
- int16_t slot = migrating_slot_;
+ SlotRange slot_range = slot_range_;
- LOG(INFO) << "[migrate] Start migrating snapshot of slot " << slot;
+ LOG(INFO) << "[migrate] Start migrating snapshot of slot(s)" <<
slot_range.String();
// Construct key prefix to iterate the keys belong to the target slot
- std::string prefix = ComposeSlotKeyPrefix(namespace_, slot);
- LOG(INFO) << "[migrate] Iterate keys of slot, key's prefix: " << prefix;
+ std::string prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
+ LOG(INFO) << "[migrate] Iterate keys of slot(s), key's prefix: " << prefix;
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = slot_snapshot_;
@@ -346,6 +351,7 @@ Status SlotMigrator::sendSnapshotByCmd() {
auto iter =
util::UniqueIterator(storage_->GetDB()->NewIterator(read_options, cf_handle));
// Seek to the beginning of keys start with 'prefix' and iterate all these
keys
+ auto current_slot = slot_range.start;
for (iter->Seek(prefix); iter->Valid(); iter->Next()) {
// The migrating task has to be stopped, if server role is changed from
master to slave
// or flush command (flushdb or flushall) is executed
@@ -354,7 +360,8 @@ Status SlotMigrator::sendSnapshotByCmd() {
}
// Iteration is out of range
- if (!iter->key().starts_with(prefix)) {
+ current_slot = ExtractSlotId(iter->key());
+ if (!slot_range.Contains(current_slot)) {
break;
}
@@ -384,8 +391,8 @@ Status SlotMigrator::sendSnapshotByCmd() {
if (auto s = iter->status(); !s.ok()) {
auto err_str = s.ToString();
- LOG(ERROR) << "[migrate] Failed to iterate keys of slot " << slot << ": "
<< err_str;
- return {Status::NotOK, fmt::format("failed to iterate keys of slot {}:
{}", slot, err_str)};
+ LOG(ERROR) << "[migrate] Failed to iterate keys of slot " << current_slot
<< ": " << err_str;
+ return {Status::NotOK, fmt::format("failed to iterate keys of slot {}:
{}", current_slot, err_str)};
}
// It's necessary to send commands that are still in the pipeline since the
final pipeline may not be sent
@@ -395,8 +402,9 @@ Status SlotMigrator::sendSnapshotByCmd() {
return s.Prefixed(errFailedToSendCommands);
}
- LOG(INFO) << "[migrate] Succeed to migrate slot snapshot, slot: " << slot <<
", Migrated keys: " << migrated_key_cnt
- << ", Expired keys: " << expired_key_cnt << ", Empty keys: " <<
empty_key_cnt;
+ LOG(INFO) << "[migrate] Succeed to migrate slot(s) snapshot, slot(s): " <<
slot_range.String()
+ << ", Migrated keys: " << migrated_key_cnt << ", Expired keys: "
<< expired_key_cnt
+ << ", Empty keys: " << empty_key_cnt;
return Status::OK();
}
@@ -408,7 +416,7 @@ Status SlotMigrator::syncWALByCmd() {
return s.Prefixed("failed to sync WAL before forbidding a slot");
}
- setForbiddenSlot(migrating_slot_);
+ setForbiddenSlotRange(slot_range_);
// Send last incremental data
s = syncWalAfterForbiddingSlot();
@@ -431,20 +439,21 @@ Status SlotMigrator::finishSuccessfulMigration() {
}
std::string dst_ip_port = dst_ip_ + ":" + std::to_string(dst_port_);
- s = srv_->cluster->SetSlotMigrated(migrating_slot_, dst_ip_port);
+ s = srv_->cluster->SetSlotRangeMigrated(slot_range_, dst_ip_port);
if (!s.IsOK()) {
- return s.Prefixed(fmt::format("failed to set slot {} as migrated to {}",
migrating_slot_.load(), dst_ip_port));
+ return s.Prefixed(
+ fmt::format("failed to set slot(s) {} as migrated to {}",
slot_range_.load().String(), dst_ip_port));
}
- migrate_failed_slot_ = -1;
+ migrate_failed_slot_range_ = {-1, -1};
return Status::OK();
}
Status SlotMigrator::finishFailedMigration() {
// Stop slot will forbid writing
- migrate_failed_slot_ = migrating_slot_;
- forbidden_slot_ = -1;
+ migrate_failed_slot_range_ = slot_range_.load();
+ forbidden_slot_range_ = {-1, -1};
// Set import status on the destination node to FAILED
auto s = setImportStatusOnDstNode(*dst_fd_, kImportFailed);
@@ -456,7 +465,7 @@ Status SlotMigrator::finishFailedMigration() {
}
void SlotMigrator::clean() {
- LOG(INFO) << "[migrate] Clean resources of migrating slot " <<
migrating_slot_;
+ LOG(INFO) << "[migrate] Clean resources of migrating slot(s) " <<
slot_range_.load().String();
if (slot_snapshot_) {
storage_->GetDB()->ReleaseSnapshot(slot_snapshot_);
slot_snapshot_ = nullptr;
@@ -468,7 +477,7 @@ void SlotMigrator::clean() {
std::lock_guard<std::mutex> guard(job_mutex_);
migration_job_.reset();
dst_fd_.Reset();
- migrating_slot_ = -1;
+ slot_range_ = {-1, -1};
SetStopMigrationFlag(false);
}
@@ -491,7 +500,7 @@ Status SlotMigrator::setImportStatusOnDstNode(int sock_fd,
int status) {
if (sock_fd <= 0) return {Status::NotOK, "invalid socket descriptor"};
std::string cmd =
- redis::ArrayOfBulkStrings({"cluster", "import",
std::to_string(migrating_slot_), std::to_string(status)});
+ redis::ArrayOfBulkStrings({"cluster", "import",
slot_range_.load().String(), std::to_string(status)});
auto s = util::SockSend(sock_fd, cmd);
if (!s.IsOK()) {
return s.Prefixed("failed to send command to the destination node");
@@ -989,21 +998,21 @@ Status SlotMigrator::sendCmdsPipelineIfNeed(std::string
*commands, bool need) {
return Status::OK();
}
-void SlotMigrator::setForbiddenSlot(int16_t slot) {
- LOG(INFO) << "[migrate] Setting forbidden slot " << slot;
+void SlotMigrator::setForbiddenSlotRange(const SlotRange &slot_range) {
+ LOG(INFO) << "[migrate] Setting forbidden slot(s) " << slot_range.String();
// Block server to set forbidden slot
uint64_t during = util::GetTimeStampUS();
{
auto exclusivity = srv_->WorkExclusivityGuard();
- forbidden_slot_ = slot;
+ forbidden_slot_range_ = slot_range;
}
during = util::GetTimeStampUS() - during;
LOG(INFO) << "[migrate] To set forbidden slot, server was blocked for " <<
during << "us";
}
-void SlotMigrator::ReleaseForbiddenSlot() {
- LOG(INFO) << "[migrate] Release forbidden slot " << forbidden_slot_;
- forbidden_slot_ = -1;
+void SlotMigrator::ReleaseForbiddenSlotRange() {
+ LOG(INFO) << "[migrate] Release forbidden slot(s) " <<
forbidden_slot_range_.load().String();
+ forbidden_slot_range_ = {-1, -1};
}
void SlotMigrator::applyMigrationSpeedLimit() const {
@@ -1023,7 +1032,7 @@ void SlotMigrator::applyMigrationSpeedLimit() const {
Status SlotMigrator::generateCmdsFromBatch(rocksdb::BatchResult *batch,
std::string *commands) {
// Iterate batch to get keys and construct commands for keys
- WriteBatchExtractor write_batch_extractor(storage_->IsSlotIdEncoded(),
migrating_slot_, false);
+ WriteBatchExtractor write_batch_extractor(storage_->IsSlotIdEncoded(),
slot_range_, false);
rocksdb::Status status =
batch->writeBatchPtr->Iterate(&write_batch_extractor);
if (!status.ok()) {
LOG(ERROR) << "[migrate] Failed to parse write batch, Err: " <<
status.ToString();
@@ -1163,11 +1172,12 @@ Status SlotMigrator::syncWalAfterForbiddingSlot() {
void SlotMigrator::GetMigrationInfo(std::string *info) const {
info->clear();
- if (migrating_slot_ < 0 && forbidden_slot_ < 0 && migrate_failed_slot_ < 0) {
+ if (!slot_range_.load().IsValid() && !forbidden_slot_range_.load().IsValid()
&&
+ !migrate_failed_slot_range_.load().IsValid()) {
return;
}
- int16_t slot = -1;
+ SlotRange slot_range;
std::string task_state;
switch (migration_state_.load()) {
@@ -1176,22 +1186,22 @@ void SlotMigrator::GetMigrationInfo(std::string *info)
const {
break;
case MigrationState::kStarted:
task_state = "start";
- slot = migrating_slot_;
+ slot_range = slot_range_;
break;
case MigrationState::kSuccess:
task_state = "success";
- slot = forbidden_slot_;
+ slot_range = forbidden_slot_range_;
break;
case MigrationState::kFailed:
task_state = "fail";
- slot = migrate_failed_slot_;
+ slot_range = migrate_failed_slot_range_;
break;
default:
break;
}
- *info =
- fmt::format("migrating_slot: {}\r\ndestination_node:
{}\r\nmigrating_state: {}\r\n", slot, dst_node_, task_state);
+ *info = fmt::format("migrating_slot(s): {}\r\ndestination_node:
{}\r\nmigrating_state: {}\r\n", slot_range.String(),
+ dst_node_, task_state);
}
void SlotMigrator::CancelSyncCtx() {
@@ -1217,9 +1227,10 @@ Status SlotMigrator::sendMigrationBatch(BatchSender
*batch) {
Status SlotMigrator::sendSnapshotByRawKV() {
uint64_t start_ts = util::GetTimeStampMS();
- LOG(INFO) << "[migrate] Migrating snapshot of slot " << migrating_slot_ << "
by raw key value";
+ auto slot_range = slot_range_.load();
+ LOG(INFO) << "[migrate] Migrating snapshot of slot(s) " <<
slot_range.String() << " by raw key value";
- auto prefix = ComposeSlotKeyPrefix(namespace_, migrating_slot_);
+ auto prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = slot_snapshot_;
rocksdb::Slice prefix_slice(prefix);
@@ -1228,7 +1239,13 @@ Status SlotMigrator::sendSnapshotByRawKV() {
BatchSender batch_sender(*dst_fd_, migrate_batch_size_bytes_,
migrate_batch_bytes_per_sec_);
- for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix);
iter.Next()) {
+ for (iter.Seek(prefix); iter.Valid(); iter.Next()) {
+ // Iteration is out of range
+ auto key_slot_id = ExtractSlotId(iter.Key());
+ if (!slot_range.Contains(key_slot_id)) {
+ break;
+ }
+
auto redis_type = iter.Type();
std::string log_data;
if (redis_type == RedisType::kRedisList) {
@@ -1273,9 +1290,9 @@ Status SlotMigrator::sendSnapshotByRawKV() {
auto elapsed = util::GetTimeStampMS() - start_ts;
LOG(INFO) << fmt::format(
- "[migrate] Succeed to migrate snapshot, slot: {}, elapsed: {} ms, "
+ "[migrate] Succeed to migrate snapshot range, slot(s): {}, elapsed: {}
ms, "
"sent: {} bytes, rate: {:.2f} kb/s, batches: {}, entries: {}",
- migrating_slot_.load(), elapsed, batch_sender.GetSentBytes(),
batch_sender.GetRate(start_ts),
+ slot_range.String(), elapsed, batch_sender.GetSentBytes(),
batch_sender.GetRate(start_ts),
batch_sender.GetSentBatchesNum(), batch_sender.GetEntriesNum());
return Status::OK();
@@ -1283,7 +1300,7 @@ Status SlotMigrator::sendSnapshotByRawKV() {
Status SlotMigrator::syncWALByRawKV() {
uint64_t start_ts = util::GetTimeStampMS();
- LOG(INFO) << "[migrate] Syncing WAL of slot " << migrating_slot_ << " by raw
key value";
+ LOG(INFO) << "[migrate] Syncing WAL of slot(s) " <<
slot_range_.load().String() << " by raw key value";
BatchSender batch_sender(*dst_fd_, migrate_batch_size_bytes_,
migrate_batch_bytes_per_sec_);
int epoch = 1;
@@ -1304,7 +1321,7 @@ Status SlotMigrator::syncWALByRawKV() {
epoch++;
}
- setForbiddenSlot(migrating_slot_);
+ setForbiddenSlotRange(slot_range_);
wal_incremental_seq = storage_->GetDB()->GetLatestSequenceNumber();
if (wal_incremental_seq > wal_begin_seq_) {
@@ -1318,9 +1335,9 @@ Status SlotMigrator::syncWALByRawKV() {
auto elapsed = util::GetTimeStampMS() - start_ts;
LOG(INFO) << fmt::format(
- "[migrate] Succeed to migrate incremental data, slot: {}, elapsed: {}
ms, "
+ "[migrate] Succeed to migrate incremental data, slot(s): {}, elapsed: {}
ms, "
"sent: {} bytes, rate: {:.2f} kb/s, batches: {}, entries: {}",
- migrating_slot_.load(), elapsed, batch_sender.GetSentBytes(),
batch_sender.GetRate(start_ts),
+ slot_range_.load().String(), elapsed, batch_sender.GetSentBytes(),
batch_sender.GetRate(start_ts),
batch_sender.GetSentBatchesNum(), batch_sender.GetEntriesNum());
return Status::OK();
@@ -1329,15 +1346,16 @@ Status SlotMigrator::syncWALByRawKV() {
bool SlotMigrator::catchUpIncrementalWAL() {
uint64_t gap = storage_->GetDB()->GetLatestSequenceNumber() - wal_begin_seq_;
if (gap <= seq_gap_limit_) {
- LOG(INFO) << fmt::format("[migrate] Incremental data sequence gap: {},
less than limit: {}, set forbidden slot: {}",
- gap, seq_gap_limit_, migrating_slot_.load());
+ LOG(INFO) << fmt::format(
+ "[migrate] Incremental data sequence gap: {}, less than limit: {}, set
forbidden slot(s): {}", gap,
+ seq_gap_limit_, slot_range_.load().String());
return true;
}
return false;
}
Status SlotMigrator::migrateIncrementalDataByRawKV(uint64_t end_seq,
BatchSender *batch_sender) {
- engine::WALIterator wal_iter(storage_, migrating_slot_);
+ engine::WALIterator wal_iter(storage_, slot_range_);
uint64_t start_seq = wal_begin_seq_ + 1;
for (wal_iter.Seek(start_seq); wal_iter.Valid(); wal_iter.Next()) {
if (wal_iter.NextSequenceNumber() > end_seq + 1) {
diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h
index e22ba47d..179150b8 100644
--- a/src/cluster/slot_migrate.h
+++ b/src/cluster/slot_migrate.h
@@ -55,8 +55,9 @@ enum class SlotMigrationStage { kNone, kStart, kSnapshot,
kWAL, kSuccess, kFaile
enum class KeyMigrationResult { kMigrated, kExpired, kUnderlyingStructEmpty };
struct SlotMigrationJob {
- SlotMigrationJob(int slot_id, std::string dst_ip, int dst_port, int speed,
int pipeline_size, int seq_gap)
- : slot_id(static_cast<int16_t>(slot_id)),
+ SlotMigrationJob(const SlotRange &slot_range_in, std::string dst_ip, int
dst_port, int speed, int pipeline_size,
+ int seq_gap)
+ : slot_range(slot_range_in),
dst_ip(std::move(dst_ip)),
dst_port(dst_port),
max_speed(speed),
@@ -66,7 +67,7 @@ struct SlotMigrationJob {
SlotMigrationJob &operator=(const SlotMigrationJob &other) = delete;
~SlotMigrationJob() = default;
- int16_t slot_id;
+ SlotRange slot_range;
std::string dst_ip;
int dst_port;
int max_speed;
@@ -84,9 +85,9 @@ class SlotMigrator : public redis::Database {
~SlotMigrator();
Status CreateMigrationThread();
- Status PerformSlotMigration(const std::string &node_id, std::string &dst_ip,
int dst_port, int slot_id,
- SyncMigrateContext *blocking_ctx = nullptr);
- void ReleaseForbiddenSlot();
+ Status PerformSlotRangeMigration(const std::string &node_id, std::string
&dst_ip, int dst_port,
+ const SlotRange &range, SyncMigrateContext
*blocking_ctx = nullptr);
+ void ReleaseForbiddenSlotRange();
void SetMaxMigrationSpeed(int value) {
if (value >= 0) max_migration_speed_ = value;
}
@@ -101,8 +102,8 @@ class SlotMigrator : public redis::Database {
void SetStopMigrationFlag(bool value) { stop_migration_ = value; }
bool IsMigrationInProgress() const { return migration_state_ ==
MigrationState::kStarted; }
SlotMigrationStage GetCurrentSlotMigrationStage() const { return
current_stage_; }
- int16_t GetForbiddenSlot() const { return forbidden_slot_; }
- int16_t GetMigratingSlot() const { return migrating_slot_; }
+ SlotRange GetForbiddenSlotRange() const { return forbidden_slot_range_; }
+ SlotRange GetMigratingSlotRange() const { return slot_range_; }
std::string GetDstNode() const { return dst_node_; }
void GetMigrationInfo(std::string *info) const;
void CancelSyncCtx();
@@ -149,7 +150,7 @@ class SlotMigrator : public redis::Database {
bool catchUpIncrementalWAL();
Status migrateIncrementalDataByRawKV(uint64_t end_seq, BatchSender
*batch_sender);
- void setForbiddenSlot(int16_t slot);
+ void setForbiddenSlotRange(const SlotRange &slot_range);
std::unique_lock<std::mutex> blockingLock() { return
std::unique_lock<std::mutex>(blocking_mutex_); }
void resumeSyncCtx(const Status &migrate_result);
@@ -190,9 +191,12 @@ class SlotMigrator : public redis::Database {
UniqueFD dst_fd_;
MigrationType migration_type_ = MigrationType::kRedisCommand;
- std::atomic<int16_t> forbidden_slot_ = -1;
- std::atomic<int16_t> migrating_slot_ = -1;
- int16_t migrate_failed_slot_ = -1;
+
+ static_assert(std::atomic<SlotRange>::is_always_lock_free, "SlotRange is not
lock free.");
+ std::atomic<SlotRange> forbidden_slot_range_ = SlotRange{-1, -1};
+ std::atomic<SlotRange> slot_range_ = SlotRange{-1, -1};
+ std::atomic<SlotRange> migrate_failed_slot_range_ = SlotRange{-1, -1};
+
std::atomic<bool> stop_migration_ = false; // if is true migration will be
stopped but the thread won't be destroyed
const rocksdb::Snapshot *slot_snapshot_ = nullptr;
uint64_t wal_begin_seq_ = 0;
diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc
index 0f9b1760..e4fb2ded 100644
--- a/src/commands/cmd_cluster.cc
+++ b/src/commands/cmd_cluster.cc
@@ -47,7 +47,11 @@ class CommandCluster : public Commander {
if (subcommand_ == "import") {
if (args.size() != 4) return {Status::RedisParseErr,
errWrongNumOfArguments};
- slot_ = GET_OR_RET(ParseInt<int64_t>(args[2], 10));
+
+ Status s = CommandTable::ParseSlotRanges(args_[2], slot_ranges_);
+ if (!s.IsOK()) {
+ return s;
+ }
auto state = ParseInt<unsigned>(args[3], {kImportStart, kImportNone},
10);
if (!state) return {Status::NotOK, "Invalid import state"};
@@ -109,7 +113,8 @@ class CommandCluster : public Commander {
return s;
}
} else if (subcommand_ == "import") {
- Status s = srv->cluster->ImportSlot(conn, static_cast<int>(slot_),
state_);
+ // TODO: support multiple slot ranges
+ Status s = srv->cluster->ImportSlotRange(conn, slot_ranges_[0], state_);
if (s.IsOK()) {
*output = redis::SimpleString("OK");
} else {
@@ -138,7 +143,7 @@ class CommandCluster : public Commander {
private:
std::string subcommand_;
- int64_t slot_ = -1;
+ std::vector<SlotRange> slot_ranges_;
ImportStatus state_ = kImportNone;
};
@@ -154,7 +159,10 @@ class CommandClusterX : public Commander {
if (subcommand_ == "migrate") {
if (args.size() < 4 || args.size() > 6) return {Status::RedisParseErr,
errWrongNumOfArguments};
- slot_ = GET_OR_RET(ParseInt<int64_t>(args[2], 10));
+ Status s = CommandTable::ParseSlotRanges(args_[2], slot_ranges_);
+ if (!s.IsOK()) {
+ return s;
+ }
dst_node_id_ = args[3];
@@ -279,8 +287,8 @@ class CommandClusterX : public Commander {
if (sync_migrate_) {
sync_migrate_ctx_ = std::make_unique<SyncMigrateContext>(srv, conn,
sync_migrate_timeout_);
}
-
- Status s = srv->cluster->MigrateSlot(static_cast<int>(slot_),
dst_node_id_, sync_migrate_ctx_.get());
+ // TODO: support multiple slot ranges
+ Status s = srv->cluster->MigrateSlotRange(slot_ranges_[0], dst_node_id_,
sync_migrate_ctx_.get());
if (s.IsOK()) {
if (sync_migrate_) {
return {Status::BlockingCmd};
diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc
index c1559b1a..e9ac4925 100644
--- a/src/storage/batch_extractor.cc
+++ b/src/storage/batch_extractor.cc
@@ -53,7 +53,8 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t
column_family_id, const Slic
if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::Metadata)) {
std::tie(ns, user_key) = ExtractNamespaceKey<std::string>(key,
is_slot_id_encoded_);
- if (slot_id_ >= 0 && static_cast<uint16_t>(slot_id_) !=
GetSlotIdFromKey(user_key)) {
+ auto key_slot_id = GetSlotIdFromKey(user_key);
+ if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
return rocksdb::Status::OK();
}
@@ -112,7 +113,8 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t
column_family_id, const Slic
if (column_family_id ==
static_cast<uint32_t>(ColumnFamilyID::PrimarySubkey)) {
InternalKey ikey(key, is_slot_id_encoded_);
user_key = ikey.GetKey().ToString();
- if (slot_id_ >= 0 && static_cast<uint16_t>(slot_id_) !=
GetSlotIdFromKey(user_key)) {
+ auto key_slot_id = GetSlotIdFromKey(user_key);
+ if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
return rocksdb::Status::OK();
}
@@ -279,7 +281,8 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t
column_family_id, const S
std::string user_key;
std::tie(ns, user_key) = ExtractNamespaceKey<std::string>(key,
is_slot_id_encoded_);
- if (slot_id_ >= 0 && static_cast<uint16_t>(slot_id_) !=
GetSlotIdFromKey(user_key)) {
+ auto key_slot_id = GetSlotIdFromKey(user_key);
+ if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
return rocksdb::Status::OK();
}
@@ -287,7 +290,8 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t
column_family_id, const S
} else if (column_family_id ==
static_cast<uint32_t>(ColumnFamilyID::PrimarySubkey)) {
InternalKey ikey(key, is_slot_id_encoded_);
std::string user_key = ikey.GetKey().ToString();
- if (slot_id_ >= 0 && static_cast<uint16_t>(slot_id_) !=
GetSlotIdFromKey(user_key)) {
+ auto key_slot_id = GetSlotIdFromKey(user_key);
+ if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
return rocksdb::Status::OK();
}
diff --git a/src/storage/batch_extractor.h b/src/storage/batch_extractor.h
index 694cc3bf..4f577504 100644
--- a/src/storage/batch_extractor.h
+++ b/src/storage/batch_extractor.h
@@ -24,6 +24,7 @@
#include <string>
#include <vector>
+#include "cluster/cluster_defs.h"
#include "redis_db.h"
#include "redis_metadata.h"
#include "status.h"
@@ -32,8 +33,10 @@
// An extractor to extract update from raw write batch
class WriteBatchExtractor : public rocksdb::WriteBatch::Handler {
public:
- explicit WriteBatchExtractor(bool is_slot_id_encoded, int16_t slot_id = -1,
bool to_redis = false)
- : is_slot_id_encoded_(is_slot_id_encoded), slot_id_(slot_id),
to_redis_(to_redis) {}
+ explicit WriteBatchExtractor(bool is_slot_id_encoded, int slot = -1, bool
to_redis = false)
+ : is_slot_id_encoded_(is_slot_id_encoded), slot_range_(slot, slot),
to_redis_(to_redis) {}
+ explicit WriteBatchExtractor(bool is_slot_id_encoded, const SlotRange
&slot_range, bool to_redis = false)
+ : is_slot_id_encoded_(is_slot_id_encoded), slot_range_(slot_range),
to_redis_(to_redis) {}
void LogData(const rocksdb::Slice &blob) override;
rocksdb::Status PutCF(uint32_t column_family_id, const Slice &key, const
Slice &value) override;
@@ -49,6 +52,6 @@ class WriteBatchExtractor : public
rocksdb::WriteBatch::Handler {
redis::WriteBatchLogData log_data_;
bool first_seen_ = true;
bool is_slot_id_encoded_ = false;
- int slot_id_;
+ SlotRange slot_range_;
bool to_redis_;
};
diff --git a/src/storage/iterator.cc b/src/storage/iterator.cc
index c65dd3ab..e368c679 100644
--- a/src/storage/iterator.cc
+++ b/src/storage/iterator.cc
@@ -167,7 +167,8 @@ void SubKeyIterator::Reset() {
}
rocksdb::Status WALBatchExtractor::PutCF(uint32_t column_family_id, const
Slice &key, const Slice &value) {
- if (slot_ != -1 && slot_ != ExtractSlotId(key)) {
+ auto key_slot_id = ExtractSlotId(key);
+ if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
return rocksdb::Status::OK();
}
items_.emplace_back(WALItem::Type::kTypePut, column_family_id,
key.ToString(), value.ToString());
@@ -175,7 +176,8 @@ rocksdb::Status WALBatchExtractor::PutCF(uint32_t
column_family_id, const Slice
}
rocksdb::Status WALBatchExtractor::DeleteCF(uint32_t column_family_id, const
rocksdb::Slice &key) {
- if (slot_ != -1 && slot_ != ExtractSlotId(key)) {
+ auto key_slot_id = ExtractSlotId(key);
+ if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
return rocksdb::Status::OK();
}
items_.emplace_back(WALItem::Type::kTypeDelete, column_family_id,
key.ToString(), std::string{});
@@ -245,7 +247,7 @@ void WALIterator::nextBatch() {
}
void WALIterator::Seek(rocksdb::SequenceNumber seq) {
- if (slot_ != -1 && !storage_->IsSlotIdEncoded()) {
+ if (slot_range_.IsValid() && !storage_->IsSlotIdEncoded()) {
Reset();
return;
}
diff --git a/src/storage/iterator.h b/src/storage/iterator.h
index 2f123630..34f96f6f 100644
--- a/src/storage/iterator.h
+++ b/src/storage/iterator.h
@@ -22,6 +22,7 @@
#include <rocksdb/iterator.h>
#include <rocksdb/options.h>
+#include "cluster/cluster_defs.h"
#include "storage.h"
namespace engine {
@@ -102,7 +103,8 @@ struct WALItem {
class WALBatchExtractor : public rocksdb::WriteBatch::Handler {
public:
// If set slot, storage must enable slot id encoding
- explicit WALBatchExtractor(int slot = -1) : slot_(slot) {}
+ explicit WALBatchExtractor(int slot = -1) : slot_range_(slot, slot) {}
+ explicit WALBatchExtractor(const SlotRange &slot_range) :
slot_range_(slot_range) {}
rocksdb::Status PutCF(uint32_t column_family_id, const Slice &key, const
Slice &value) override;
@@ -133,13 +135,15 @@ class WALBatchExtractor : public
rocksdb::WriteBatch::Handler {
private:
std::vector<WALItem> items_;
- int slot_;
+ SlotRange slot_range_;
};
class WALIterator {
public:
+ explicit WALIterator(engine::Storage *storage, const SlotRange &slot_range)
+ : storage_(storage), slot_range_(slot_range), extractor_(slot_range),
next_batch_seq_(0){};
explicit WALIterator(engine::Storage *storage, int slot = -1)
- : storage_(storage), slot_(slot), extractor_(slot), next_batch_seq_(0){};
+ : storage_(storage), slot_range_(slot, slot), extractor_(slot),
next_batch_seq_(0){};
~WALIterator() = default;
bool Valid() const;
@@ -154,7 +158,7 @@ class WALIterator {
void nextBatch();
engine::Storage *storage_;
- int slot_;
+ SlotRange slot_range_;
std::unique_ptr<rocksdb::TransactionLogIterator> iter_;
WALBatchExtractor extractor_;
diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc
index 0fd31137..b8c46458 100644
--- a/src/storage/redis_db.cc
+++ b/src/storage/redis_db.cc
@@ -529,18 +529,14 @@ std::string Database::AppendNamespacePrefix(const Slice
&user_key) {
return ComposeNamespaceKey(namespace_, user_key,
storage_->IsSlotIdEncoded());
}
-rocksdb::Status Database::ClearKeysOfSlot(const rocksdb::Slice &ns, int slot) {
+rocksdb::Status Database::ClearKeysOfSlotRange(const rocksdb::Slice &ns, const
SlotRange &slot_range) {
if (!storage_->IsSlotIdEncoded()) {
return rocksdb::Status::Aborted("It is not in cluster mode");
}
- std::string prefix = ComposeSlotKeyPrefix(ns, slot);
- std::string prefix_end = ComposeSlotKeyPrefix(ns, slot + 1);
- auto s = storage_->DeleteRange(prefix, prefix_end);
- if (!s.ok()) {
- return s;
- }
- return rocksdb::Status::OK();
+ std::string prefix = ComposeSlotKeyPrefix(ns, slot_range.start);
+ std::string prefix_end = ComposeSlotKeyPrefix(ns, slot_range.end + 1);
+ return storage_->DeleteRange(prefix, prefix_end);
}
rocksdb::Status Database::KeyExist(const std::string &key) {
diff --git a/src/storage/redis_db.h b/src/storage/redis_db.h
index 12d9896b..289d2a2a 100644
--- a/src/storage/redis_db.h
+++ b/src/storage/redis_db.h
@@ -27,6 +27,7 @@
#include <variant>
#include <vector>
+#include "cluster/cluster_defs.h"
#include "redis_metadata.h"
#include "server/redis_reply.h"
#include "storage.h"
@@ -134,7 +135,7 @@ class Database {
RedisType type = kRedisNone);
[[nodiscard]] rocksdb::Status RandomKey(const std::string &cursor,
std::string *key);
std::string AppendNamespacePrefix(const Slice &user_key);
- [[nodiscard]] rocksdb::Status ClearKeysOfSlot(const rocksdb::Slice &ns, int
slot);
+ [[nodiscard]] rocksdb::Status ClearKeysOfSlotRange(const rocksdb::Slice &ns,
const SlotRange &slot_range);
[[nodiscard]] rocksdb::Status KeyExist(const std::string &key);
// Copy <key,value> to <new_key,value> (already an internal key)
diff --git a/tests/gocase/integration/cluster/cluster_test.go
b/tests/gocase/integration/cluster/cluster_test.go
index fad7d012..541c2a99 100644
--- a/tests/gocase/integration/cluster/cluster_test.go
+++ b/tests/gocase/integration/cluster/cluster_test.go
@@ -511,7 +511,7 @@ func TestClusterReset(t *testing.T) {
slotNum := 1
require.Equal(t, "OK", rdb1.Do(ctx, "cluster", "import",
slotNum, 0).Val())
clusterInfo := rdb1.ClusterInfo(ctx).Val()
- require.Contains(t, clusterInfo, "importing_slot: 1")
+ require.Contains(t, clusterInfo, "importing_slot(s): 1")
require.Contains(t, clusterInfo, "import_state: start")
require.Contains(t, rdb1.ClusterResetHard(ctx).Err(), "Can't
reset cluster while importing slot")
require.Equal(t, "OK", rdb1.Do(ctx, "cluster", "import",
slotNum, 1).Val())
@@ -533,7 +533,7 @@ func TestClusterReset(t *testing.T) {
require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate",
slotNum, id1).Val())
clusterInfo := rdb0.ClusterInfo(ctx).Val()
- require.Contains(t, clusterInfo, "migrating_slot: 2")
+ require.Contains(t, clusterInfo, "migrating_slot(s): 2")
require.Contains(t, clusterInfo, "migrating_state: start")
require.Contains(t, rdb0.ClusterResetHard(ctx).Err(), "Can't
reset cluster while migrating slot")
diff --git a/tests/gocase/integration/slotimport/slotimport_test.go
b/tests/gocase/integration/slotimport/slotimport_test.go
index a3566cab..7b199280 100644
--- a/tests/gocase/integration/slotimport/slotimport_test.go
+++ b/tests/gocase/integration/slotimport/slotimport_test.go
@@ -83,8 +83,8 @@ func TestImportedServer(t *testing.T) {
require.NoError(t, rdbB.Do(ctx, "clusterx", "SETNODES", clusterNodes,
"1").Err())
t.Run("IMPORT - error slot", func(t *testing.T) {
- require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import", -1,
0).Err(), "Slot is out of range")
- require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import",
16384, 0).Err(), "Slot is out of range")
+ require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import", -1,
0).Err(), "Invalid slot range")
+ require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import",
16384, 0).Err(), "Invalid slot id: out of numeric range")
})
t.Run("IMPORT - slot with error state", func(t *testing.T) {
@@ -106,7 +106,7 @@ func TestImportedServer(t *testing.T) {
require.Equal(t, "slot1", rdbA.Get(ctx, slotKey).Val())
require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import",
slotNum, 0).Val())
clusterInfo := rdbB.ClusterInfo(ctx).Val()
- require.Contains(t, clusterInfo, "importing_slot: 1")
+ require.Contains(t, clusterInfo, "importing_slot(s): 1")
require.Contains(t, clusterInfo, "import_state: start")
clusterNodes := rdbB.ClusterNodes(ctx).Val()
require.Contains(t, clusterNodes, fmt.Sprintf("[%d-<-%s]",
slotNum, srvAID))
@@ -114,14 +114,14 @@ func TestImportedServer(t *testing.T) {
require.NoError(t, rdbA.Do(ctx, "clusterx", "migrate", slotNum,
srvBID).Err())
require.Eventually(t, func() bool {
clusterInfo :=
rdbA.ClusterInfo(context.Background()).Val()
- return strings.Contains(clusterInfo,
fmt.Sprintf("migrating_slot: %d", slotNum)) &&
+ return strings.Contains(clusterInfo,
fmt.Sprintf("migrating_slot(s): %d", slotNum)) &&
strings.Contains(clusterInfo,
fmt.Sprintf("migrating_state: %s", "success"))
}, 5*time.Second, 100*time.Millisecond)
// import success
require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import",
slotNum, 1).Val())
clusterInfo = rdbB.ClusterInfo(ctx).Val()
- require.Contains(t, clusterInfo, "importing_slot: 1")
+ require.Contains(t, clusterInfo, "importing_slot(s): 1")
require.Contains(t, clusterInfo, "import_state: success")
// import finish and should not contain the import section
@@ -145,7 +145,7 @@ func TestImportedServer(t *testing.T) {
time.Sleep(50 * time.Millisecond)
clusterInfo := rdbB.ClusterInfo(ctx).Val()
- require.Contains(t, clusterInfo, "importing_slot: 10")
+ require.Contains(t, clusterInfo, "importing_slot(s): 10")
require.Contains(t, clusterInfo, "import_state: error")
// get empty
@@ -165,7 +165,7 @@ func TestImportedServer(t *testing.T) {
time.Sleep(50 * time.Millisecond)
clusterInfo := rdbB.ClusterInfo(ctx).Val()
- require.Contains(t, clusterInfo, "importing_slot: 11")
+ require.Contains(t, clusterInfo, "importing_slot(s): 11")
require.Contains(t, clusterInfo, "import_state: error")
// get empty
diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go
b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
index f54a428f..a0cd1f86 100644
--- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go
+++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
@@ -108,12 +108,6 @@ func TestSlotMigrateDestServerKilled(t *testing.T) {
clusterNodes += fmt.Sprintf("%s %s %d master - 10001-16383", id1,
srv1.Host(), srv1.Port())
require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes,
"1").Err())
require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes,
"1").Err())
-
- t.Run("MIGRATE - Slot is out of range", func(t *testing.T) {
- require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
-1, id1).Err(), "Slot is out of range")
- require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
16384, id1).Err(), "Slot is out of range")
- })
-
t.Run("MIGRATE - Cannot migrate slot to itself", func(t *testing.T) {
require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 1,
id0).Err(), "Can't migrate slot to myself")
})
@@ -482,7 +476,7 @@ func TestSlotMigrateSync(t *testing.T) {
err := rdb0.Do(ctx, "clusterx", "migrate", slot, id1,
"sync", timeout).Err()
// go-redis will auto-retry if occurs timeout error, so
it may return the already migrated slot error,
// so we just allow this error here to prevent the
flaky test failure.
- if err != nil && !strings.Contains(err.Error(), "ERR
There is already a migrating slot") {
+ if err != nil && !strings.Contains(err.Error(), "ERR
There is already a migrating job") {
require.NoError(t, err)
}
}
@@ -531,7 +525,7 @@ func TestSlotMigrateDataType(t *testing.T) {
}
require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate",
slot, id1).Val())
otherSlot := 2
- require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
otherSlot, id1).Err(), "There is already a migrating slot")
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
otherSlot, id1).Err(), "There is already a migrating job")
waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess)
require.EqualValues(t, cnt, rdb1.LLen(ctx,
util.SlotTable[slot]).Val())
})
@@ -1168,24 +1162,186 @@ func waitForMigrateState(t testing.TB, client
*redis.Client, slot int, state Slo
waitForMigrateStateInDuration(t, client, slot, state, 5*time.Second)
}
+func waitForMigrateSlotRangeState(t testing.TB, client *redis.Client,
slotRange string, state SlotMigrationState) {
+ waitForMigrateSlotRangeStateInDuration(t, client, slotRange, state,
5*time.Second)
+}
+
func waitForMigrateStateInDuration(t testing.TB, client *redis.Client, slot
int, state SlotMigrationState, d time.Duration) {
require.Eventually(t, func() bool {
i := client.ClusterInfo(context.Background()).Val()
- return strings.Contains(i, fmt.Sprintf("migrating_slot: %d",
slot)) &&
+ return strings.Contains(i, fmt.Sprintf("migrating_slot(s): %d",
slot)) &&
+ strings.Contains(i, fmt.Sprintf("migrating_state: %s",
state))
+ }, d, 100*time.Millisecond)
+}
+
+func waitForMigrateSlotRangeStateInDuration(t testing.TB, client
*redis.Client, slotRange string, state SlotMigrationState, d time.Duration) {
+ slots := strings.Split(slotRange, "-")
+ if len(slots) == 2 && slots[0] == slots[1] {
+ slotRange = slots[0]
+ }
+ require.Eventually(t, func() bool {
+ i := client.ClusterInfo(context.Background()).Val()
+ return strings.Contains(i, fmt.Sprintf("migrating_slot(s): %s",
slotRange)) &&
strings.Contains(i, fmt.Sprintf("migrating_state: %s",
state))
}, d, 100*time.Millisecond)
}
func requireMigrateState(t testing.TB, client *redis.Client, slot int, state
SlotMigrationState) {
i := client.ClusterInfo(context.Background()).Val()
- require.Contains(t, i, fmt.Sprintf("migrating_slot: %d", slot))
+ require.Contains(t, i, fmt.Sprintf("migrating_slot(s): %d", slot))
+ require.Contains(t, i, fmt.Sprintf("migrating_state: %s", state))
+}
+
+func requireMigrateSlotRangeState(t testing.TB, client *redis.Client,
slotRange string, state SlotMigrationState) {
+ i := client.ClusterInfo(context.Background()).Val()
+ require.Contains(t, i, fmt.Sprintf("migrating_slot(s): %s", slotRange))
require.Contains(t, i, fmt.Sprintf("migrating_state: %s", state))
}
func waitForImportState(t testing.TB, client *redis.Client, n int, state
SlotImportState) {
require.Eventually(t, func() bool {
i := client.ClusterInfo(context.Background()).Val()
- return strings.Contains(i, fmt.Sprintf("importing_slot: %d",
n)) &&
+ return strings.Contains(i, fmt.Sprintf("importing_slot(s): %d",
n)) &&
strings.Contains(i, fmt.Sprintf("import_state: %s",
state))
}, 10*time.Second, 100*time.Millisecond)
}
+func migrateSlotRangeAndSetSlot(t *testing.T, ctx context.Context, source
*redis.Client, dest *redis.Client, destID string, slotRange string) {
+ require.Equal(t, "OK", source.Do(ctx, "clusterx", "migrate", slotRange,
destID).Val())
+ waitForMigrateSlotRangeState(t, source, slotRange,
SlotMigrationStateSuccess)
+ sourceVersion, _ := source.Do(ctx, "clusterx", "version").Int()
+ destVersion, _ := dest.Do(ctx, "clusterx", "version").Int()
+ require.NoError(t, source.Do(ctx, "clusterx", "setslot", slotRange,
"node", destID, sourceVersion+1).Err())
+ require.NoError(t, dest.Do(ctx, "clusterx", "setslot", slotRange,
"node", destID, destVersion+1).Err())
+}
+
+func TestSlotRangeMigrate(t *testing.T) {
+ ctx := context.Background()
+
+ srv0 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ rdb0 := srv0.NewClient()
+ defer func() { require.NoError(t, rdb0.Close()) }()
+ defer func() { srv0.Close() }()
+ id0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODEID", id0).Err())
+
+ srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ srv1Alive := true
+ defer func() {
+ if srv1Alive {
+ srv1.Close()
+ }
+ }()
+
+ rdb1 := srv1.NewClient()
+ defer func() { require.NoError(t, rdb1.Close()) }()
+ id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())
+
+ clusterNodes := fmt.Sprintf("%s %s %d master - 0-10000\n", id0,
srv0.Host(), srv0.Port())
+ clusterNodes += fmt.Sprintf("%s %s %d master - 10001-16383", id1,
srv1.Host(), srv1.Port())
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes,
"1").Err())
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes,
"1").Err())
+
+ for slot := 0; slot < 500; slot++ {
+ for i := 0; i < 10; i++ {
+ require.NoError(t, rdb0.LPush(ctx,
util.SlotTable[slot], i).Err())
+ }
+ }
+
+ t.Run("MIGRATE - Slot range migration basic cases", func(t *testing.T) {
+ migrateSlotRangeAndSetSlot(t, ctx, rdb0, rdb1, id1, "0-9")
+ nodes := rdb0.ClusterNodes(ctx).Val()
+ require.Contains(t, nodes, "10-10000", "0-9 10001-16383")
+ })
+
+ t.Run("MIGRATE - Special slot range cases", func(t *testing.T) {
+ for i := 0; i < 10; i++ {
+ require.NoError(t, rdb1.LPush(ctx,
util.SlotTable[16383], i).Err())
+ }
+
+ migrateSlotRangeAndSetSlot(t, ctx, rdb0, rdb1, id1, "10")
+ time.Sleep(1 * time.Second)
+ migrateSlotRangeAndSetSlot(t, ctx, rdb0, rdb1, id1, "11-11")
+ time.Sleep(1 * time.Second)
+ migrateSlotRangeAndSetSlot(t, ctx, rdb1, rdb0, id0,
"16383-16383")
+ time.Sleep(1 * time.Second)
+ nodes := rdb0.ClusterNodes(ctx).Val()
+ require.Contains(t, nodes, "12-10000 16383", "0-11 10001-16382")
+
+ errMsg := "Invalid slot range"
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"-1", id1).Err(), errMsg)
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"2-1", id1).Err(), errMsg)
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"-1-3", id1).Err(), errMsg)
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"-1--1", id1).Err(), errMsg)
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"3--1", id1).Err(), errMsg)
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"4-16384", id1).Err(), errMsg)
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"16384-16384", id1).Err(), errMsg)
+
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"16384", id1).Err(), "Invalid slot id: out of numeric range")
+ })
+
+ t.Run("MIGRATE - Repeat migration cases", func(t *testing.T) {
+ // non-overlapping
+ migrateSlotRangeAndSetSlot(t, ctx, rdb0, rdb1, id1, "104-106")
+ time.Sleep(1 * time.Second)
+ migrateSlotRangeAndSetSlot(t, ctx, rdb0, rdb1, id1, "107-108")
+ time.Sleep(1 * time.Second)
+ migrateSlotRangeAndSetSlot(t, ctx, rdb0, rdb1, id1, "102-103")
+ time.Sleep(1 * time.Second)
+ nodes := rdb0.ClusterNodes(ctx).Val()
+ require.Contains(t, nodes, "12-101 109-10000 16383", "0-11
102-108 10001-16382")
+
+ // overlap
+ errMsg := "Can't migrate slot which doesn't belong to me"
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"100-102", id1).Err(), errMsg)
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"100-104", id1).Err(), errMsg)
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"108-109", id1).Err(), errMsg)
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"106-109", id1).Err(), errMsg)
+
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"102-108", id1).Err(), errMsg)
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"101-109", id1).Err(), errMsg)
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"105", id1).Err(), errMsg)
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"105-105", id1).Err(), errMsg)
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"104-106", id1).Err(), errMsg)
+ })
+
+ t.Run("MIGRATE - Repeat migration cases, but does not immediately
update the topology via setslot", func(t *testing.T) {
+ // non-overlapping
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate",
"114-116", id1).Val())
+ waitForMigrateSlotRangeState(t, rdb0, "114-116",
SlotMigrationStateSuccess)
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate",
"117-118", id1).Val())
+ waitForMigrateSlotRangeState(t, rdb0, "117-118",
SlotMigrationStateSuccess)
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate",
"112-113", id1).Val())
+ waitForMigrateSlotRangeState(t, rdb0, "112-113",
SlotMigrationStateSuccess)
+ for slot := 112; slot <= 118; slot++ {
+ require.Contains(t, rdb0.LPush(ctx,
util.SlotTable[slot], 10).Err(), "MOVED")
+ }
+
+ // overlap
+ errMsg := "Can't migrate slot which has been migrated"
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"114-116", id1).Err(), errMsg)
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"117-118", id1).Err(), errMsg)
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"112", id1).Err(), errMsg)
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"112-112", id1).Err(), errMsg)
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"113", id1).Err(), errMsg)
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"113-113", id1).Err(), errMsg)
+
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"112-113", id1).Err(), errMsg)
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"112-120", id1).Err(), errMsg)
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate",
"110-112", id1).Err(), errMsg)
+ })
+
+ t.Run("MIGRATE - Failure cases", func(t *testing.T) {
+ largeSlot := 210
+ for i := 0; i < 20000; i++ {
+ require.NoError(t, rdb0.LPush(ctx,
util.SlotTable[largeSlot], i).Err())
+ }
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate",
"200-220", id1).Val())
+ requireMigrateSlotRangeState(t, rdb0, "200-220",
SlotMigrationStateStarted)
+ srv1Alive = false
+ srv1.Close()
+ time.Sleep(time.Second)
+ // TODO: More precise migration failure slot range
+ waitForMigrateSlotRangeState(t, rdb0, "200-220",
SlotMigrationStateFailed)
+ })
+}
diff --git a/tests/gocase/unit/geo/geo_test.go
b/tests/gocase/unit/geo/geo_test.go
index ebbbcd32..a920ddab 100644
--- a/tests/gocase/unit/geo/geo_test.go
+++ b/tests/gocase/unit/geo/geo_test.go
@@ -38,9 +38,10 @@ func geoDegrad(deg float64) float64 {
return deg * math.Atan(1) * 8 / 360
}
-func geoRandomPoint() (float64, float64) {
- lon := (-180 + rand.Float64()*360)
- lat := (-70 + rand.Float64()*140)
+func geoRandomPointWithSeed(seed int64) (float64, float64) {
+ r := rand.New(rand.NewSource(seed))
+ lon := (-180 + r.Float64()*360)
+ lat := (-70 + r.Float64()*140)
return lon, lat
}
@@ -434,26 +435,26 @@ var testGeo = func(t *testing.T, enabledRESP3 string) {
t.Run("GEOADD + GEORANGE randomized test", func(t *testing.T) {
for attempt := 0; attempt < 30; attempt++ {
var debuginfo string
+ var seed int64
if attempt < len(regressionVectors) {
- rand.Seed(regressionVectors[attempt].seed)
- debuginfo += "rand seed is " +
strconv.FormatInt(regressionVectors[attempt].seed, 10)
+ seed = regressionVectors[attempt].seed
} else {
- tmp := time.Now().UnixNano()
- rand.Seed(tmp)
- debuginfo += "rand seed is " +
strconv.FormatInt(tmp, 10)
+ seed = time.Now().UnixNano()
}
+ debuginfo += "rand seed is " + strconv.FormatInt(seed,
10)
+
require.NoError(t, rdb.Del(ctx, "mypoints").Err())
var radiusKm int64
- if util.RandomInt(10) == 0 {
- radiusKm = util.RandomInt(50000) + 10
+ if util.RandomIntWithSeed(10, seed) == 0 {
+ radiusKm = util.RandomIntWithSeed(50000, seed)
+ 10
} else {
- radiusKm = util.RandomInt(200) + 10
+ radiusKm = util.RandomIntWithSeed(200, seed) +
10
}
if attempt < len(regressionVectors) {
radiusKm = regressionVectors[attempt].km
}
radiusM := radiusKm * 1000
- searchLon, searchLat := geoRandomPoint()
+ searchLon, searchLat := geoRandomPointWithSeed(seed)
if attempt < len(regressionVectors) {
searchLon = regressionVectors[attempt].lon
searchLat = regressionVectors[attempt].lat
@@ -462,7 +463,7 @@ var testGeo = func(t *testing.T, enabledRESP3 string) {
var result []string
var argvs []*redis.GeoLocation
for j := 0; j < 20000; j++ {
- lon, lat := geoRandomPoint()
+ lon, lat := geoRandomPointWithSeed(seed)
argvs = append(argvs,
&redis.GeoLocation{Longitude: lon, Latitude: lat, Name: "place:" +
strconv.Itoa(j)})
distance := geoDistance(lon, lat, searchLon,
searchLat)
if distance < float64(radiusM) {
diff --git a/tests/gocase/unit/type/list/list_test.go
b/tests/gocase/unit/type/list/list_test.go
index e7f744e0..c440ebab 100644
--- a/tests/gocase/unit/type/list/list_test.go
+++ b/tests/gocase/unit/type/list/list_test.go
@@ -53,7 +53,6 @@ func TestLTRIM(t *testing.T) {
key := "myList"
startLen := int64(32)
- rand.Seed(0)
for typ, value := range largeValue {
t.Run(fmt.Sprintf("LTRIM stress testing - %s", typ), func(t
*testing.T) {
var myList []string
@@ -99,8 +98,6 @@ func TestZipList(t *testing.T) {
})
defer func() { require.NoError(t, rdb.Close()) }()
- rand.Seed(0)
-
t.Run("Explicit regression for a list bug", func(t *testing.T) {
key := "l"
myList := []string{
diff --git a/tests/gocase/unit/type/zset/zset_test.go
b/tests/gocase/unit/type/zset/zset_test.go
index 04807e4d..e3d60912 100644
--- a/tests/gocase/unit/type/zset/zset_test.go
+++ b/tests/gocase/unit/type/zset/zset_test.go
@@ -612,7 +612,7 @@ func basicTests(t *testing.T, rdb *redis.Client, ctx
context.Context, enabledRES
for i := 0; i < 20; i++ {
var args [3]int64
for j := 0; j < 3; j++ {
- rand.Seed(time.Now().UnixNano())
+ rand :=
rand.New(rand.NewSource(time.Now().UnixNano()))
args[j] = rand.Int63n(20) - 10
}
if args[2] == 0 {
diff --git a/tests/gocase/util/random.go b/tests/gocase/util/random.go
index 15acc4b8..f0a7c020 100644
--- a/tests/gocase/util/random.go
+++ b/tests/gocase/util/random.go
@@ -45,6 +45,11 @@ func RandomInt(max int64) int64 {
return rand.Int63() % max
}
+func RandomIntWithSeed(max, seed int64) int64 {
+ r := rand.New(rand.NewSource(seed))
+ return r.Int63() % max
+}
+
func RandomBool() bool {
return RandomInt(2) != 0
}
@@ -114,3 +119,38 @@ func RandomValue() string {
},
)
}
+
+func RandomValueWithSeed(seed int64) string {
+ return RandPath(
+ // Small enough to likely collide
+ func() string {
+ return fmt.Sprintf("%d", RandomSignedInt(1000))
+ },
+ // 32 bit compressible signed/unsigned
+ func() string {
+ return RandPath(
+ func() string {
+ return fmt.Sprintf("%d",
rand.Int63n(2000000000))
+ },
+ func() string {
+ return fmt.Sprintf("%d",
rand.Int63n(4000000000))
+ },
+ )
+ },
+ // 64 bit
+ func() string {
+ return fmt.Sprintf("%d", rand.Int63n(1000000000000))
+ },
+ // Random string
+ func() string {
+ return RandPath(
+ func() string {
+ return RandString(0, 256, Alpha)
+ },
+ func() string {
+ return RandString(0, 256, Binary)
+ },
+ )
+ },
+ )
+}