This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new c701b987 fix(cluster): should remove the master if it's not node in 
the cluster (#2643)
c701b987 is described below

commit c701b987b47965d91f1068254907f9c7e52db592
Author: hulk <[email protected]>
AuthorDate: Mon Nov 4 12:12:24 2024 +0800

    fix(cluster): should remove the master if it's not node in the cluster 
(#2643)
    
    Currently, the replica won't remove the master replication
    while it's not a node in the cluster, which is an unexpected behavior
    for the living master node in the cluster.
    
    This fixes #2618.
---
 src/cluster/cluster.cc                           | 19 ++++-----
 tests/gocase/integration/cluster/cluster_test.go | 51 +++++++++++++++++-------
 2 files changed, 46 insertions(+), 24 deletions(-)

diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index c4dd9b8c..3850e12f 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -75,9 +75,7 @@ Status Cluster::SetNodeId(const std::string &node_id) {
   }
 
   // Set replication relationship
-  if (myself_) return SetMasterSlaveRepl();
-
-  return Status::OK();
+  return SetMasterSlaveRepl();
 }
 
 // The reason why the new version MUST be +1 of current version is that,
@@ -204,11 +202,8 @@ Status Cluster::SetClusterNodes(const std::string 
&nodes_str, int64_t version, b
   }
 
   // Set replication relationship
-  if (myself_) {
-    s = SetMasterSlaveRepl();
-    if (!s.IsOK()) {
-      return s.Prefixed("failed to set master-replica replication");
-    }
+  if (auto s = SetMasterSlaveRepl(); !s.IsOK()) {
+    return s.Prefixed("failed to set master-replica replication");
   }
 
   // Clear data of migrated slots
@@ -234,7 +229,13 @@ Status Cluster::SetClusterNodes(const std::string 
&nodes_str, int64_t version, b
 Status Cluster::SetMasterSlaveRepl() {
   if (!srv_) return Status::OK();
 
-  if (!myself_) return Status::OK();
+  // If the node is not in the cluster topology, remove the master replication 
if it's a replica.
+  if (!myself_) {
+    if (auto s = srv_->RemoveMaster(); !s.IsOK()) {
+      return s.Prefixed("failed to remove master");
+    }
+    return Status::OK();
+  }
 
   if (myself_->role == kClusterMaster) {
     // Master mode
diff --git a/tests/gocase/integration/cluster/cluster_test.go 
b/tests/gocase/integration/cluster/cluster_test.go
index 56047947..05ec27f0 100644
--- a/tests/gocase/integration/cluster/cluster_test.go
+++ b/tests/gocase/integration/cluster/cluster_test.go
@@ -154,58 +154,79 @@ func TestClusterNodes(t *testing.T) {
 }
 
 func TestClusterReplicas(t *testing.T) {
-       srv := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
-       defer srv.Close()
-
        ctx := context.Background()
-       rdb := srv.NewClient()
-       defer func() { require.NoError(t, rdb.Close()) }()
+       srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+       rdb1 := srv1.NewClient()
+       srv2 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+       rdb2 := srv2.NewClient()
+
+       defer func() {
+               srv1.Close()
+               srv2.Close()
+               require.NoError(t, rdb1.Close())
+               require.NoError(t, rdb2.Close())
+       }()
 
        nodes := ""
 
        master1ID := "bb2e5b3c5282086df51eff6b3e35519aede96fa6"
-       master1Node := fmt.Sprintf("%s %s %d master - 0-8191", master1ID, 
srv.Host(), srv.Port())
+       master1Node := fmt.Sprintf("%s %s %d master - 0-8191", master1ID, 
srv1.Host(), srv1.Port())
        nodes += master1Node + "\n"
 
        master2ID := "159dde1194ebf5bfc5a293dff839c3d1476f2a49"
-       master2Node := fmt.Sprintf("%s %s %d master - 8192-16383", master2ID, 
srv.Host(), srv.Port())
+       master2Node := fmt.Sprintf("%s %s %d master - 8192-16383", master2ID, 
srv1.Host(), srv1.Port())
        nodes += master2Node + "\n"
 
        replica2ID := "7dbee3d628f04cc5d763b36e92b10533e627a1d0"
-       replica2Node := fmt.Sprintf("%s %s %d slave %s", replica2ID, 
srv.Host(), srv.Port(), master2ID)
+       replica2Node := fmt.Sprintf("%s %s %d slave %s", replica2ID, 
srv2.Host(), srv2.Port(), master2ID)
        nodes += replica2Node
 
-       require.NoError(t, rdb.Do(ctx, "clusterx", "SETNODES", nodes, 
"2").Err())
-       require.EqualValues(t, "2", rdb.Do(ctx, "clusterx", "version").Val())
+       require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", nodes, 
"2").Err())
+       require.EqualValues(t, "2", rdb1.Do(ctx, "clusterx", "version").Val())
+       require.NoError(t, rdb2.Do(ctx, "clusterx", "SETNODES", nodes, 
"2").Err())
 
        t.Run("with replicas", func(t *testing.T) {
-               replicas, err := rdb.Do(ctx, "cluster", "replicas", 
"159dde1194ebf5bfc5a293dff839c3d1476f2a49").Text()
+               replicas, err := rdb1.Do(ctx, "cluster", "replicas", 
"159dde1194ebf5bfc5a293dff839c3d1476f2a49").Text()
                require.NoError(t, err)
                fields := strings.Split(replicas, " ")
                require.Len(t, fields, 8)
-               require.Equal(t, fmt.Sprintf("%s@%d", srv.HostPort(), 
srv.Port()+10000), fields[1])
+               require.Equal(t, fmt.Sprintf("%s@%d", srv2.HostPort(), 
srv2.Port()+10000), fields[1])
                require.Equal(t, "slave", fields[2])
                require.Equal(t, master2ID, fields[3])
                require.Equal(t, "connected\n", fields[7])
        })
 
        t.Run("without replicas", func(t *testing.T) {
-               replicas, err := rdb.Do(ctx, "cluster", "replicas", 
"bb2e5b3c5282086df51eff6b3e35519aede96fa6").Text()
+               replicas, err := rdb1.Do(ctx, "cluster", "replicas", 
"bb2e5b3c5282086df51eff6b3e35519aede96fa6").Text()
                require.NoError(t, err)
                require.Empty(t, replicas)
        })
 
        t.Run("send command to replica", func(t *testing.T) {
-               err := rdb.Do(ctx, "cluster", "replicas", 
"7dbee3d628f04cc5d763b36e92b10533e627a1d0").Err()
+               err := rdb1.Do(ctx, "cluster", "replicas", 
"7dbee3d628f04cc5d763b36e92b10533e627a1d0").Err()
                require.Error(t, err)
                require.Contains(t, err.Error(), "The node isn't a master")
        })
 
        t.Run("unknown node", func(t *testing.T) {
-               err := rdb.Do(ctx, "cluster", "replicas", "unknown").Err()
+               err := rdb1.Do(ctx, "cluster", "replicas", "unknown").Err()
                require.Error(t, err)
                require.Contains(t, err.Error(), "Invalid cluster node id")
        })
+
+       t.Run("remove the replication if the node is not in the cluster", 
func(t *testing.T) {
+               require.Equal(t, "slave", util.FindInfoEntry(rdb2, "role"))
+               // remove the cluster replica node
+               clusterNode := fmt.Sprintf("%s\n%s", master1Node, master2Node)
+               err := rdb1.Do(ctx, "clusterx", "SETNODES", clusterNode, 
"3").Err()
+               require.NoError(t, err)
+               err = rdb2.Do(ctx, "clusterx", "SETNODES", clusterNode, 
"3").Err()
+               require.NoError(t, err)
+
+               require.Eventually(t, func() bool {
+                       return util.FindInfoEntry(rdb2, "role") == "master"
+               }, 5*time.Second, 100*time.Millisecond)
+       })
 }
 
 func TestClusterDumpAndLoadClusterNodesInfo(t *testing.T) {

Reply via email to