This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 3938a951 Fix missing migrating/importing information in the CLUSTER
NODES command (#2196)
3938a951 is described below
commit 3938a951b8a01ce6eb3bfcb0cbb5ba4ce2e17f2a
Author: hulk <[email protected]>
AuthorDate: Mon Mar 25 08:47:10 2024 +0800
Fix missing migrating/importing information in the CLUSTER NODES command
(#2196)
In Redis, it will add the migrating/importing slot section
for the source and target node. For the migrating source node,
it will contain the below section:
```
[{slot_id}->-{target_node_id}]
```
And for the importing node, it will add:
```
[{slot_id}-<-{source_node_id}]
```
---
src/cluster/cluster.cc | 56 +++++++++++++--------
src/cluster/cluster.h | 1 +
src/cluster/slot_import.cc | 58 ++++++++--------------
src/cluster/slot_import.h | 9 ++--
src/cluster/slot_migrate.h | 1 +
tests/cppunit/cluster_test.cc | 27 +++++++---
.../integration/slotimport/slotimport_test.go | 6 +++
.../integration/slotmigrate/slotmigrate_test.go | 4 ++
8 files changed, 93 insertions(+), 69 deletions(-)
diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index f6f3525b..bb6c30de 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -319,40 +319,34 @@ Status Cluster::ImportSlot(redis::Connection *conn, int
slot, int state) {
return {Status::NotOK, errSlotOutOfRange};
}
+ Status s;
switch (state) {
case kImportStart:
- if (!srv_->slot_import->Start(conn->GetFD(), slot)) {
- return {Status::NotOK, fmt::format("Can't start importing slot {}",
slot)};
- }
+ s = srv_->slot_import->Start(slot);
+ if (!s.IsOK()) return s;
// Set link importing
conn->SetImporting();
myself_->importing_slot = slot;
// Set link error callback
- conn->close_cb = [object_ptr = srv_->slot_import.get(), capture_fd =
conn->GetFD()](int fd) {
- object_ptr->StopForLinkError(capture_fd);
- };
- // Stop forbidding writing slot to accept write commands
+ conn->close_cb = [object_ptr = srv_->slot_import.get()](int fd) {
+ auto s = object_ptr->StopForLinkError();
+ if (!s.IsOK()) {
+ LOG(ERROR) << "[import] Failed to stop importing slot: " << s.Msg();
+ }
+ }; // Stop forbidding writing slot to accept write commands
if (slot == srv_->slot_migrator->GetForbiddenSlot())
srv_->slot_migrator->ReleaseForbiddenSlot();
LOG(INFO) << "[import] Start importing slot " << slot;
break;
case kImportSuccess:
- if (!srv_->slot_import->Success(slot)) {
- LOG(ERROR) << "[import] Failed to set slot importing success, maybe
slot is wrong"
- << ", received slot: " << slot << ", current slot: " <<
srv_->slot_import->GetSlot();
- return {Status::NotOK, fmt::format("Failed to set slot {} importing
success", slot)};
- }
-
- LOG(INFO) << "[import] Succeed to import slot " << slot;
+ s = srv_->slot_import->Success(slot);
+ if (!s.IsOK()) return s;
+ LOG(INFO) << "[import] Mark the importing slot as succeed" << slot;
break;
case kImportFailed:
- if (!srv_->slot_import->Fail(slot)) {
- LOG(ERROR) << "[import] Failed to set slot importing error, maybe slot
is wrong"
- << ", received slot: " << slot << ", current slot: " <<
srv_->slot_import->GetSlot();
- return {Status::NotOK, fmt::format("Failed to set slot {} importing
error", slot)};
- }
-
- LOG(INFO) << "[import] Failed to import slot " << slot;
+ s = srv_->slot_import->Fail(slot);
+ if (!s.IsOK()) return s;
+ LOG(INFO) << "[import] Mark the importing slot as failed" << slot;
break;
default:
return {Status::NotOK, errInvalidImportState};
@@ -471,6 +465,11 @@ Status Cluster::GetClusterNodes(std::string *nodes_str) {
return Status::OK();
}
+std::string Cluster::getNodeIDBySlot(int slot) const {
+ if (slot < 0 || slot >= kClusterSlots || !slots_nodes_[slot]) return "";
+ return slots_nodes_[slot]->id;
+}
+
std::string Cluster::genNodesDescription() {
auto slots_infos = getClusterNodeSlots();
@@ -502,6 +501,21 @@ std::string Cluster::genNodesDescription() {
}
}
+ // Just for MYSELF node to show the importing/migrating slot
+ if (n->id == myid_) {
+ if (srv_->slot_migrator) {
+ auto migrating_slot = srv_->slot_migrator->GetMigratingSlot();
+ if (migrating_slot != -1) {
+ node_str.append(fmt::format(" [{}->-{}]", migrating_slot,
srv_->slot_migrator->GetDstNode()));
+ }
+ }
+ if (srv_->slot_import) {
+ auto importing_slot = srv_->slot_import->GetSlot();
+ if (importing_slot != -1) {
+ node_str.append(fmt::format(" [{}-<-{}]", importing_slot,
getNodeIDBySlot(importing_slot)));
+ }
+ }
+ }
nodes_desc.append(node_str + "\n");
}
return nodes_desc;
diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h
index 8b8132ab..0afc832e 100644
--- a/src/cluster/cluster.h
+++ b/src/cluster/cluster.h
@@ -93,6 +93,7 @@ class Cluster {
static bool SubCommandIsExecExclusive(const std::string &subcommand);
private:
+ std::string getNodeIDBySlot(int slot) const;
std::string genNodesDescription();
std::string genNodesInfo();
std::map<std::string, std::string> getClusterNodeSlots() const;
diff --git a/src/cluster/slot_import.cc b/src/cluster/slot_import.cc
index 66963611..a3988e76 100644
--- a/src/cluster/slot_import.cc
+++ b/src/cluster/slot_import.cc
@@ -21,82 +21,64 @@
#include "slot_import.h"
SlotImport::SlotImport(Server *srv)
- : Database(srv->storage, kDefaultNamespace),
- srv_(srv),
- import_slot_(-1),
- import_status_(kImportNone),
- import_fd_(-1) {
+ : Database(srv->storage, kDefaultNamespace), srv_(srv), import_slot_(-1),
import_status_(kImportNone) {
std::lock_guard<std::mutex> guard(mutex_);
// Let metadata_cf_handle_ be nullptr, then get them in real time while use
them.
// See comments in SlotMigrator::SlotMigrator for detailed reason.
metadata_cf_handle_ = nullptr;
}
-bool SlotImport::Start(int fd, int slot) {
+Status SlotImport::Start(int slot) {
std::lock_guard<std::mutex> guard(mutex_);
if (import_status_ == kImportStart) {
- LOG(ERROR) << "[import] Only one slot importing is allowed"
- << ", current slot is " << import_slot_ << ", cannot import
slot " << slot;
- return false;
+ return {Status::NotOK, fmt::format("only one importing slot is allowed,
current slot is: {}", import_slot_)};
}
// Clean slot data first
auto s = ClearKeysOfSlot(namespace_, slot);
if (!s.ok()) {
- LOG(INFO) << "[import] Failed to clear keys of slot " << slot << "current
status is importing 'START'"
- << ", Err: " << s.ToString();
- return false;
+ return {Status::NotOK, fmt::format("clear keys of slot error: {}",
s.ToString())};
}
import_status_ = kImportStart;
import_slot_ = slot;
- import_fd_ = fd;
-
- return true;
+ return Status::OK();
}
-bool SlotImport::Success(int slot) {
+Status SlotImport::Success(int slot) {
std::lock_guard<std::mutex> guard(mutex_);
if (import_slot_ != slot) {
- LOG(ERROR) << "[import] Wrong slot, importing slot: " << import_slot_ <<
", but got slot: " << slot;
- return false;
+ return {Status::NotOK, fmt::format("mismatch slot, importing slot: {}, but
got: {}", import_slot_, slot)};
}
Status s = srv_->cluster->SetSlotImported(import_slot_);
if (!s.IsOK()) {
- LOG(ERROR) << "[import] Failed to set slot, Err: " << s.Msg();
- return false;
+ return {Status::NotOK, fmt::format("unable to set imported status: {}",
slot)};
}
import_status_ = kImportSuccess;
- import_fd_ = -1;
-
- return true;
+ return Status::OK();
}
-bool SlotImport::Fail(int slot) {
+Status SlotImport::Fail(int slot) {
std::lock_guard<std::mutex> guard(mutex_);
if (import_slot_ != slot) {
- LOG(ERROR) << "[import] Wrong slot, importing slot: " << import_slot_ <<
", but got slot: " << slot;
- return false;
+ return {Status::NotOK, fmt::format("mismatch slot, importing slot: {}, but
got: {}", import_slot_, slot)};
}
// Clean imported slot data
auto s = ClearKeysOfSlot(namespace_, slot);
if (!s.ok()) {
- LOG(INFO) << "[import] Failed to clear keys of slot " << slot << ",
current importing status is importing 'FAIL'"
- << ", Err: " << s.ToString();
+ return {Status::NotOK, fmt::format("clear keys of slot error: {}",
s.ToString())};
}
import_status_ = kImportFailed;
- import_fd_ = -1;
-
- return true;
+ return Status::OK();
}
-void SlotImport::StopForLinkError(int fd) {
+Status SlotImport::StopForLinkError() {
std::lock_guard<std::mutex> guard(mutex_);
- if (import_status_ != kImportStart) return;
+ if (import_status_ != kImportStart) return {Status::NotOK, "no slot is
importing"};
// Maybe server has failovered
// Situation:
@@ -111,18 +93,20 @@ void SlotImport::StopForLinkError(int fd) {
// Clean imported slot data
auto s = ClearKeysOfSlot(namespace_, import_slot_);
if (!s.ok()) {
- LOG(WARNING) << "[import] Failed to clear keys of slot " << import_slot_
<< " Current status is link error"
- << ", Err: " << s.ToString();
+ return {Status::NotOK, fmt::format("clear keys of slot error: {}",
s.ToString())};
}
}
- LOG(INFO) << "[import] Stop importing for link error, slot: " <<
import_slot_;
import_status_ = kImportFailed;
- import_fd_ = -1;
+ return Status::OK();
}
int SlotImport::GetSlot() {
std::lock_guard<std::mutex> guard(mutex_);
+ // import_slot_ only be set when import_status_ is kImportStart
+ if (import_status_ != kImportStart) {
+ return -1;
+ }
return import_slot_;
}
diff --git a/src/cluster/slot_import.h b/src/cluster/slot_import.h
index 385aca0f..9f4bb72d 100644
--- a/src/cluster/slot_import.h
+++ b/src/cluster/slot_import.h
@@ -42,10 +42,10 @@ class SlotImport : public redis::Database {
explicit SlotImport(Server *srv);
~SlotImport() = default;
- bool Start(int fd, int slot);
- bool Success(int slot);
- bool Fail(int slot);
- void StopForLinkError(int fd);
+ Status Start(int slot);
+ Status Success(int slot);
+ Status Fail(int slot);
+ Status StopForLinkError();
int GetSlot();
int GetStatus();
void GetImportInfo(std::string *info);
@@ -55,5 +55,4 @@ class SlotImport : public redis::Database {
std::mutex mutex_;
int import_slot_;
int import_status_;
- int import_fd_;
};
diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h
index e1faf404..e22ba47d 100644
--- a/src/cluster/slot_migrate.h
+++ b/src/cluster/slot_migrate.h
@@ -103,6 +103,7 @@ class SlotMigrator : public redis::Database {
SlotMigrationStage GetCurrentSlotMigrationStage() const { return
current_stage_; }
int16_t GetForbiddenSlot() const { return forbidden_slot_; }
int16_t GetMigratingSlot() const { return migrating_slot_; }
+ std::string GetDstNode() const { return dst_node_; }
void GetMigrationInfo(std::string *info) const;
void CancelSyncCtx();
diff --git a/tests/cppunit/cluster_test.cc b/tests/cppunit/cluster_test.cc
index a19d05e0..810a1ca1 100644
--- a/tests/cppunit/cluster_test.cc
+++ b/tests/cppunit/cluster_test.cc
@@ -30,8 +30,15 @@
#include "cluster/cluster_defs.h"
#include "commands/commander.h"
#include "server/server.h"
+#include "test_base.h"
-TEST(Cluster, CluseterSetNodes) {
+class ClusterTest : public TestBase {
+ protected:
+ explicit ClusterTest() = default;
+ ~ClusterTest() override = default;
+};
+
+TEST_F(ClusterTest, CluseterSetNodes) {
Status s;
Cluster cluster(nullptr, {"127.0.0.1"}, 3002);
@@ -101,13 +108,21 @@ TEST(Cluster, CluseterSetNodes) {
ASSERT_TRUE(cluster.GetVersion() == 1);
}
-TEST(Cluster, CluseterGetNodes) {
+TEST_F(ClusterTest, CluseterGetNodes) {
const std::string nodes =
"07c37dfeb235213a872192d90877d0cd55635b91 127.0.0.1 30004 "
"slave e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca\n"
"67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1 30002 "
"master - 5461-10922";
- Cluster cluster(nullptr, {"127.0.0.1"}, 30002);
+ auto config = storage_->GetConfig();
+ // don't start workers
+ config->workers = 0;
+ Server server(storage_.get(), config);
+ // we don't need the server resource, so just stop it once it's started
+ server.Stop();
+ server.Join();
+
+ Cluster cluster(&server, {"127.0.0.1"}, 30002);
Status s = cluster.SetClusterNodes(nodes, 1, false);
ASSERT_TRUE(s.IsOK());
@@ -139,7 +154,7 @@ TEST(Cluster, CluseterGetNodes) {
}
}
-TEST(Cluster, CluseterGetSlotInfo) {
+TEST_F(ClusterTest, CluseterGetSlotInfo) {
const std::string nodes =
"07c37dfeb235213a872192d90877d0cd55635b91 127.0.0.1 30004 "
"slave 67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1\n"
@@ -161,7 +176,7 @@ TEST(Cluster, CluseterGetSlotInfo) {
ASSERT_TRUE(info.nodes[1].id == "07c37dfeb235213a872192d90877d0cd55635b91");
}
-TEST(Cluster, TestDumpAndLoadClusterNodesInfo) {
+TEST_F(ClusterTest, TestDumpAndLoadClusterNodesInfo) {
int64_t version = 2;
const std::string nodes =
"07c37dfeb235213a872192d90877d0cd55635b91 127.0.0.1 30004 "
@@ -200,7 +215,7 @@ TEST(Cluster, TestDumpAndLoadClusterNodesInfo) {
unlink(nodes_filename.c_str());
}
-TEST(Cluster, ClusterParseSlotRanges) {
+TEST_F(ClusterTest, ClusterParseSlotRanges) {
Status s;
Cluster cluster(nullptr, {"127.0.0.1"}, 3002);
const std::string node_id = "67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1";
diff --git a/tests/gocase/integration/slotimport/slotimport_test.go
b/tests/gocase/integration/slotimport/slotimport_test.go
index b86f9275..9a1830fa 100644
--- a/tests/gocase/integration/slotimport/slotimport_test.go
+++ b/tests/gocase/integration/slotimport/slotimport_test.go
@@ -92,12 +92,18 @@ func TestImportedServer(t *testing.T) {
require.Contains(t, clusterInfo, "importing_slot: 1")
require.Contains(t, clusterInfo, "import_state: start")
+ clusterNodes := rdb.ClusterNodes(ctx).Val()
+ require.Contains(t, clusterNodes, fmt.Sprintf("[%d-<-%s]",
slotNum, srvID))
// import success
require.Equal(t, "OK", rdb.Do(ctx, "cluster", "import",
slotNum, 1).Val())
clusterInfo = rdb.ClusterInfo(ctx).Val()
require.Contains(t, clusterInfo, "importing_slot: 1")
require.Contains(t, clusterInfo, "import_state: success")
+ // import finish and should not contain the import section
+ clusterNodes = rdb.ClusterNodes(ctx).Val()
+ require.NotContains(t, clusterNodes, fmt.Sprintf("[%d-<-%s]",
slotNum, srvID))
+
time.Sleep(50 * time.Millisecond)
require.Equal(t, "slot1", rdb.Get(ctx, slotKey).Val())
})
diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go
b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
index d782e64d..f54a428f 100644
--- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go
+++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
@@ -1023,6 +1023,10 @@ func TestSlotMigrateDataType(t *testing.T) {
require.NoError(t, rdb0.LPush(ctx,
util.SlotTable[testSlot], i).Err())
}
require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate",
testSlot, id1).Val())
+
+ clusterNodes := rdb0.ClusterNodes(ctx).Val()
+ require.Contains(t, clusterNodes, fmt.Sprintf("[%d->-%s]",
testSlot, id1))
+
// should not finish 1.5s
time.Sleep(1500 * time.Millisecond)
requireMigrateState(t, rdb0, testSlot,
SlotMigrationStateStarted)