This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 3938a951 Fix missing migrating/importing information in the CLUSTER 
NODES command (#2196)
3938a951 is described below

commit 3938a951b8a01ce6eb3bfcb0cbb5ba4ce2e17f2a
Author: hulk <[email protected]>
AuthorDate: Mon Mar 25 08:47:10 2024 +0800

    Fix missing migrating/importing information in the CLUSTER NODES command 
(#2196)
    
    In Redis, it will add the migrating/importing slot section
    for the source and target node. For the migrating source node,
    it will contain the below section:
    
    ```
    [{slot_id}->-{target_node_id}]
    ```
    
    And for the importing node, it will add:
    
    ```
    [{slot_id}-<-{source_node_id}]
    ```
---
 src/cluster/cluster.cc                             | 56 +++++++++++++--------
 src/cluster/cluster.h                              |  1 +
 src/cluster/slot_import.cc                         | 58 ++++++++--------------
 src/cluster/slot_import.h                          |  9 ++--
 src/cluster/slot_migrate.h                         |  1 +
 tests/cppunit/cluster_test.cc                      | 27 +++++++---
 .../integration/slotimport/slotimport_test.go      |  6 +++
 .../integration/slotmigrate/slotmigrate_test.go    |  4 ++
 8 files changed, 93 insertions(+), 69 deletions(-)

diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index f6f3525b..bb6c30de 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -319,40 +319,34 @@ Status Cluster::ImportSlot(redis::Connection *conn, int 
slot, int state) {
     return {Status::NotOK, errSlotOutOfRange};
   }
 
+  Status s;
   switch (state) {
     case kImportStart:
-      if (!srv_->slot_import->Start(conn->GetFD(), slot)) {
-        return {Status::NotOK, fmt::format("Can't start importing slot {}", 
slot)};
-      }
+      s = srv_->slot_import->Start(slot);
+      if (!s.IsOK()) return s;
 
       // Set link importing
       conn->SetImporting();
       myself_->importing_slot = slot;
       // Set link error callback
-      conn->close_cb = [object_ptr = srv_->slot_import.get(), capture_fd = 
conn->GetFD()](int fd) {
-        object_ptr->StopForLinkError(capture_fd);
-      };
-      // Stop forbidding writing slot to accept write commands
+      conn->close_cb = [object_ptr = srv_->slot_import.get()](int fd) {
+        auto s = object_ptr->StopForLinkError();
+        if (!s.IsOK()) {
+          LOG(ERROR) << "[import] Failed to stop importing slot: " << s.Msg();
+        }
+      };  // Stop forbidding writing slot to accept write commands
       if (slot == srv_->slot_migrator->GetForbiddenSlot()) 
srv_->slot_migrator->ReleaseForbiddenSlot();
       LOG(INFO) << "[import] Start importing slot " << slot;
       break;
     case kImportSuccess:
-      if (!srv_->slot_import->Success(slot)) {
-        LOG(ERROR) << "[import] Failed to set slot importing success, maybe 
slot is wrong"
-                   << ", received slot: " << slot << ", current slot: " << 
srv_->slot_import->GetSlot();
-        return {Status::NotOK, fmt::format("Failed to set slot {} importing 
success", slot)};
-      }
-
-      LOG(INFO) << "[import] Succeed to import slot " << slot;
+      s = srv_->slot_import->Success(slot);
+      if (!s.IsOK()) return s;
+      LOG(INFO) << "[import] Mark the importing slot as succeed" << slot;
       break;
     case kImportFailed:
-      if (!srv_->slot_import->Fail(slot)) {
-        LOG(ERROR) << "[import] Failed to set slot importing error, maybe slot 
is wrong"
-                   << ", received slot: " << slot << ", current slot: " << 
srv_->slot_import->GetSlot();
-        return {Status::NotOK, fmt::format("Failed to set slot {} importing 
error", slot)};
-      }
-
-      LOG(INFO) << "[import] Failed to import slot " << slot;
+      s = srv_->slot_import->Fail(slot);
+      if (!s.IsOK()) return s;
+      LOG(INFO) << "[import] Mark the importing slot as failed" << slot;
       break;
     default:
       return {Status::NotOK, errInvalidImportState};
@@ -471,6 +465,11 @@ Status Cluster::GetClusterNodes(std::string *nodes_str) {
   return Status::OK();
 }
 
+std::string Cluster::getNodeIDBySlot(int slot) const {
+  if (slot < 0 || slot >= kClusterSlots || !slots_nodes_[slot]) return "";
+  return slots_nodes_[slot]->id;
+}
+
 std::string Cluster::genNodesDescription() {
   auto slots_infos = getClusterNodeSlots();
 
@@ -502,6 +501,21 @@ std::string Cluster::genNodesDescription() {
       }
     }
 
+    // Just for MYSELF node to show the importing/migrating slot
+    if (n->id == myid_) {
+      if (srv_->slot_migrator) {
+        auto migrating_slot = srv_->slot_migrator->GetMigratingSlot();
+        if (migrating_slot != -1) {
+          node_str.append(fmt::format(" [{}->-{}]", migrating_slot, 
srv_->slot_migrator->GetDstNode()));
+        }
+      }
+      if (srv_->slot_import) {
+        auto importing_slot = srv_->slot_import->GetSlot();
+        if (importing_slot != -1) {
+          node_str.append(fmt::format(" [{}-<-{}]", importing_slot, 
getNodeIDBySlot(importing_slot)));
+        }
+      }
+    }
     nodes_desc.append(node_str + "\n");
   }
   return nodes_desc;
diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h
index 8b8132ab..0afc832e 100644
--- a/src/cluster/cluster.h
+++ b/src/cluster/cluster.h
@@ -93,6 +93,7 @@ class Cluster {
   static bool SubCommandIsExecExclusive(const std::string &subcommand);
 
  private:
+  std::string getNodeIDBySlot(int slot) const;
   std::string genNodesDescription();
   std::string genNodesInfo();
   std::map<std::string, std::string> getClusterNodeSlots() const;
diff --git a/src/cluster/slot_import.cc b/src/cluster/slot_import.cc
index 66963611..a3988e76 100644
--- a/src/cluster/slot_import.cc
+++ b/src/cluster/slot_import.cc
@@ -21,82 +21,64 @@
 #include "slot_import.h"
 
 SlotImport::SlotImport(Server *srv)
-    : Database(srv->storage, kDefaultNamespace),
-      srv_(srv),
-      import_slot_(-1),
-      import_status_(kImportNone),
-      import_fd_(-1) {
+    : Database(srv->storage, kDefaultNamespace), srv_(srv), import_slot_(-1), 
import_status_(kImportNone) {
   std::lock_guard<std::mutex> guard(mutex_);
   // Let metadata_cf_handle_ be nullptr, then get them in real time while use 
them.
   // See comments in SlotMigrator::SlotMigrator for detailed reason.
   metadata_cf_handle_ = nullptr;
 }
 
-bool SlotImport::Start(int fd, int slot) {
+Status SlotImport::Start(int slot) {
   std::lock_guard<std::mutex> guard(mutex_);
   if (import_status_ == kImportStart) {
-    LOG(ERROR) << "[import] Only one slot importing is allowed"
-               << ", current slot is " << import_slot_ << ", cannot import 
slot " << slot;
-    return false;
+    return {Status::NotOK, fmt::format("only one importing slot is allowed, 
current slot is: {}", import_slot_)};
   }
 
   // Clean slot data first
   auto s = ClearKeysOfSlot(namespace_, slot);
   if (!s.ok()) {
-    LOG(INFO) << "[import] Failed to clear keys of slot " << slot << "current 
status is importing 'START'"
-              << ", Err: " << s.ToString();
-    return false;
+    return {Status::NotOK, fmt::format("clear keys of slot error: {}", 
s.ToString())};
   }
 
   import_status_ = kImportStart;
   import_slot_ = slot;
-  import_fd_ = fd;
-
-  return true;
+  return Status::OK();
 }
 
-bool SlotImport::Success(int slot) {
+Status SlotImport::Success(int slot) {
   std::lock_guard<std::mutex> guard(mutex_);
   if (import_slot_ != slot) {
-    LOG(ERROR) << "[import] Wrong slot, importing slot: " << import_slot_ << 
", but got slot: " << slot;
-    return false;
+    return {Status::NotOK, fmt::format("mismatch slot, importing slot: {}, but 
got: {}", import_slot_, slot)};
   }
 
   Status s = srv_->cluster->SetSlotImported(import_slot_);
   if (!s.IsOK()) {
-    LOG(ERROR) << "[import] Failed to set slot, Err: " << s.Msg();
-    return false;
+    return {Status::NotOK, fmt::format("unable to set imported status: {}", 
slot)};
   }
 
   import_status_ = kImportSuccess;
-  import_fd_ = -1;
-
-  return true;
+  return Status::OK();
 }
 
-bool SlotImport::Fail(int slot) {
+Status SlotImport::Fail(int slot) {
   std::lock_guard<std::mutex> guard(mutex_);
   if (import_slot_ != slot) {
-    LOG(ERROR) << "[import] Wrong slot, importing slot: " << import_slot_ << 
", but got slot: " << slot;
-    return false;
+    return {Status::NotOK, fmt::format("mismatch slot, importing slot: {}, but 
got: {}", import_slot_, slot)};
   }
 
   // Clean imported slot data
   auto s = ClearKeysOfSlot(namespace_, slot);
   if (!s.ok()) {
-    LOG(INFO) << "[import] Failed to clear keys of slot " << slot << ", 
current importing status is importing 'FAIL'"
-              << ", Err: " << s.ToString();
+    return {Status::NotOK, fmt::format("clear keys of slot error: {}", 
s.ToString())};
   }
 
   import_status_ = kImportFailed;
-  import_fd_ = -1;
-
-  return true;
+  return Status::OK();
 }
 
-void SlotImport::StopForLinkError(int fd) {
+Status SlotImport::StopForLinkError() {
   std::lock_guard<std::mutex> guard(mutex_);
-  if (import_status_ != kImportStart) return;
+  if (import_status_ != kImportStart) return {Status::NotOK, "no slot is 
importing"};
 
   // Maybe server has failovered
   // Situation:
@@ -111,18 +93,20 @@ void SlotImport::StopForLinkError(int fd) {
     // Clean imported slot data
     auto s = ClearKeysOfSlot(namespace_, import_slot_);
     if (!s.ok()) {
-      LOG(WARNING) << "[import] Failed to clear keys of slot " << import_slot_ 
<< " Current status is link error"
-                   << ", Err: " << s.ToString();
+      return {Status::NotOK, fmt::format("clear keys of slot error: {}", 
s.ToString())};
     }
   }
 
-  LOG(INFO) << "[import] Stop importing for link error, slot: " << 
import_slot_;
   import_status_ = kImportFailed;
-  import_fd_ = -1;
+  return Status::OK();
 }
 
 int SlotImport::GetSlot() {
   std::lock_guard<std::mutex> guard(mutex_);
+  // import_slot_ only be set when import_status_ is kImportStart
+  if (import_status_ != kImportStart) {
+    return -1;
+  }
   return import_slot_;
 }
 
diff --git a/src/cluster/slot_import.h b/src/cluster/slot_import.h
index 385aca0f..9f4bb72d 100644
--- a/src/cluster/slot_import.h
+++ b/src/cluster/slot_import.h
@@ -42,10 +42,10 @@ class SlotImport : public redis::Database {
   explicit SlotImport(Server *srv);
   ~SlotImport() = default;
 
-  bool Start(int fd, int slot);
-  bool Success(int slot);
-  bool Fail(int slot);
-  void StopForLinkError(int fd);
+  Status Start(int slot);
+  Status Success(int slot);
+  Status Fail(int slot);
+  Status StopForLinkError();
   int GetSlot();
   int GetStatus();
   void GetImportInfo(std::string *info);
@@ -55,5 +55,4 @@ class SlotImport : public redis::Database {
   std::mutex mutex_;
   int import_slot_;
   int import_status_;
-  int import_fd_;
 };
diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h
index e1faf404..e22ba47d 100644
--- a/src/cluster/slot_migrate.h
+++ b/src/cluster/slot_migrate.h
@@ -103,6 +103,7 @@ class SlotMigrator : public redis::Database {
   SlotMigrationStage GetCurrentSlotMigrationStage() const { return 
current_stage_; }
   int16_t GetForbiddenSlot() const { return forbidden_slot_; }
   int16_t GetMigratingSlot() const { return migrating_slot_; }
+  std::string GetDstNode() const { return dst_node_; }
   void GetMigrationInfo(std::string *info) const;
   void CancelSyncCtx();
 
diff --git a/tests/cppunit/cluster_test.cc b/tests/cppunit/cluster_test.cc
index a19d05e0..810a1ca1 100644
--- a/tests/cppunit/cluster_test.cc
+++ b/tests/cppunit/cluster_test.cc
@@ -30,8 +30,15 @@
 #include "cluster/cluster_defs.h"
 #include "commands/commander.h"
 #include "server/server.h"
+#include "test_base.h"
 
-TEST(Cluster, CluseterSetNodes) {
+class ClusterTest : public TestBase {
+ protected:
+  explicit ClusterTest() = default;
+  ~ClusterTest() override = default;
+};
+
+TEST_F(ClusterTest, CluseterSetNodes) {
   Status s;
   Cluster cluster(nullptr, {"127.0.0.1"}, 3002);
 
@@ -101,13 +108,21 @@ TEST(Cluster, CluseterSetNodes) {
   ASSERT_TRUE(cluster.GetVersion() == 1);
 }
 
-TEST(Cluster, CluseterGetNodes) {
+TEST_F(ClusterTest, CluseterGetNodes) {
   const std::string nodes =
       "07c37dfeb235213a872192d90877d0cd55635b91 127.0.0.1 30004 "
       "slave e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca\n"
       "67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1 30002 "
       "master - 5461-10922";
-  Cluster cluster(nullptr, {"127.0.0.1"}, 30002);
+  auto config = storage_->GetConfig();
+  // don't start workers
+  config->workers = 0;
+  Server server(storage_.get(), config);
+  // we don't need the server resource, so just stop it once it's started
+  server.Stop();
+  server.Join();
+
+  Cluster cluster(&server, {"127.0.0.1"}, 30002);
   Status s = cluster.SetClusterNodes(nodes, 1, false);
   ASSERT_TRUE(s.IsOK());
 
@@ -139,7 +154,7 @@ TEST(Cluster, CluseterGetNodes) {
   }
 }
 
-TEST(Cluster, CluseterGetSlotInfo) {
+TEST_F(ClusterTest, CluseterGetSlotInfo) {
   const std::string nodes =
       "07c37dfeb235213a872192d90877d0cd55635b91 127.0.0.1 30004 "
       "slave 67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1\n"
@@ -161,7 +176,7 @@ TEST(Cluster, CluseterGetSlotInfo) {
   ASSERT_TRUE(info.nodes[1].id == "07c37dfeb235213a872192d90877d0cd55635b91");
 }
 
-TEST(Cluster, TestDumpAndLoadClusterNodesInfo) {
+TEST_F(ClusterTest, TestDumpAndLoadClusterNodesInfo) {
   int64_t version = 2;
   const std::string nodes =
       "07c37dfeb235213a872192d90877d0cd55635b91 127.0.0.1 30004 "
@@ -200,7 +215,7 @@ TEST(Cluster, TestDumpAndLoadClusterNodesInfo) {
   unlink(nodes_filename.c_str());
 }
 
-TEST(Cluster, ClusterParseSlotRanges) {
+TEST_F(ClusterTest, ClusterParseSlotRanges) {
   Status s;
   Cluster cluster(nullptr, {"127.0.0.1"}, 3002);
   const std::string node_id = "67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1";
diff --git a/tests/gocase/integration/slotimport/slotimport_test.go 
b/tests/gocase/integration/slotimport/slotimport_test.go
index b86f9275..9a1830fa 100644
--- a/tests/gocase/integration/slotimport/slotimport_test.go
+++ b/tests/gocase/integration/slotimport/slotimport_test.go
@@ -92,12 +92,18 @@ func TestImportedServer(t *testing.T) {
                require.Contains(t, clusterInfo, "importing_slot: 1")
                require.Contains(t, clusterInfo, "import_state: start")
 
+               clusterNodes := rdb.ClusterNodes(ctx).Val()
+               require.Contains(t, clusterNodes, fmt.Sprintf("[%d-<-%s]", 
slotNum, srvID))
                // import success
                require.Equal(t, "OK", rdb.Do(ctx, "cluster", "import", 
slotNum, 1).Val())
                clusterInfo = rdb.ClusterInfo(ctx).Val()
                require.Contains(t, clusterInfo, "importing_slot: 1")
                require.Contains(t, clusterInfo, "import_state: success")
 
+               // import finish and should not contain the import section
+               clusterNodes = rdb.ClusterNodes(ctx).Val()
+               require.NotContains(t, clusterNodes, fmt.Sprintf("[%d-<-%s]", 
slotNum, srvID))
+
                time.Sleep(50 * time.Millisecond)
                require.Equal(t, "slot1", rdb.Get(ctx, slotKey).Val())
        })
diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go 
b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
index d782e64d..f54a428f 100644
--- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go
+++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
@@ -1023,6 +1023,10 @@ func TestSlotMigrateDataType(t *testing.T) {
                        require.NoError(t, rdb0.LPush(ctx, 
util.SlotTable[testSlot], i).Err())
                }
                require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", 
testSlot, id1).Val())
+
+               clusterNodes := rdb0.ClusterNodes(ctx).Val()
+               require.Contains(t, clusterNodes, fmt.Sprintf("[%d->-%s]", 
testSlot, id1))
+
                // should not finish 1.5s
                time.Sleep(1500 * time.Millisecond)
                requireMigrateState(t, rdb0, testSlot, 
SlotMigrationStateStarted)

Reply via email to