This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new c701b987 fix(cluster): should remove the master if it's not node in
the cluster (#2643)
c701b987 is described below
commit c701b987b47965d91f1068254907f9c7e52db592
Author: hulk <[email protected]>
AuthorDate: Mon Nov 4 12:12:24 2024 +0800
fix(cluster): should remove the master if it's not node in the cluster
(#2643)
Currently, the replica won't remove the master replication
while it's not a node in the cluster, which is an unexpected behavior
for the living master node in the cluster.
This fixes #2618.
---
src/cluster/cluster.cc | 19 ++++-----
tests/gocase/integration/cluster/cluster_test.go | 51 +++++++++++++++++-------
2 files changed, 46 insertions(+), 24 deletions(-)
diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index c4dd9b8c..3850e12f 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -75,9 +75,7 @@ Status Cluster::SetNodeId(const std::string &node_id) {
}
// Set replication relationship
- if (myself_) return SetMasterSlaveRepl();
-
- return Status::OK();
+ return SetMasterSlaveRepl();
}
// The reason why the new version MUST be +1 of current version is that,
@@ -204,11 +202,8 @@ Status Cluster::SetClusterNodes(const std::string
&nodes_str, int64_t version, b
}
// Set replication relationship
- if (myself_) {
- s = SetMasterSlaveRepl();
- if (!s.IsOK()) {
- return s.Prefixed("failed to set master-replica replication");
- }
+ if (auto s = SetMasterSlaveRepl(); !s.IsOK()) {
+ return s.Prefixed("failed to set master-replica replication");
}
// Clear data of migrated slots
@@ -234,7 +229,13 @@ Status Cluster::SetClusterNodes(const std::string
&nodes_str, int64_t version, b
Status Cluster::SetMasterSlaveRepl() {
if (!srv_) return Status::OK();
- if (!myself_) return Status::OK();
+ // If the node is not in the cluster topology, remove the master replication
if it's a replica.
+ if (!myself_) {
+ if (auto s = srv_->RemoveMaster(); !s.IsOK()) {
+ return s.Prefixed("failed to remove master");
+ }
+ return Status::OK();
+ }
if (myself_->role == kClusterMaster) {
// Master mode
diff --git a/tests/gocase/integration/cluster/cluster_test.go
b/tests/gocase/integration/cluster/cluster_test.go
index 56047947..05ec27f0 100644
--- a/tests/gocase/integration/cluster/cluster_test.go
+++ b/tests/gocase/integration/cluster/cluster_test.go
@@ -154,58 +154,79 @@ func TestClusterNodes(t *testing.T) {
}
func TestClusterReplicas(t *testing.T) {
- srv := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
- defer srv.Close()
-
ctx := context.Background()
- rdb := srv.NewClient()
- defer func() { require.NoError(t, rdb.Close()) }()
+ srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ rdb1 := srv1.NewClient()
+ srv2 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ rdb2 := srv2.NewClient()
+
+ defer func() {
+ srv1.Close()
+ srv2.Close()
+ require.NoError(t, rdb1.Close())
+ require.NoError(t, rdb2.Close())
+ }()
nodes := ""
master1ID := "bb2e5b3c5282086df51eff6b3e35519aede96fa6"
- master1Node := fmt.Sprintf("%s %s %d master - 0-8191", master1ID,
srv.Host(), srv.Port())
+ master1Node := fmt.Sprintf("%s %s %d master - 0-8191", master1ID,
srv1.Host(), srv1.Port())
nodes += master1Node + "\n"
master2ID := "159dde1194ebf5bfc5a293dff839c3d1476f2a49"
- master2Node := fmt.Sprintf("%s %s %d master - 8192-16383", master2ID,
srv.Host(), srv.Port())
+ master2Node := fmt.Sprintf("%s %s %d master - 8192-16383", master2ID,
srv1.Host(), srv1.Port())
nodes += master2Node + "\n"
replica2ID := "7dbee3d628f04cc5d763b36e92b10533e627a1d0"
- replica2Node := fmt.Sprintf("%s %s %d slave %s", replica2ID,
srv.Host(), srv.Port(), master2ID)
+ replica2Node := fmt.Sprintf("%s %s %d slave %s", replica2ID,
srv2.Host(), srv2.Port(), master2ID)
nodes += replica2Node
- require.NoError(t, rdb.Do(ctx, "clusterx", "SETNODES", nodes,
"2").Err())
- require.EqualValues(t, "2", rdb.Do(ctx, "clusterx", "version").Val())
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", nodes,
"2").Err())
+ require.EqualValues(t, "2", rdb1.Do(ctx, "clusterx", "version").Val())
+ require.NoError(t, rdb2.Do(ctx, "clusterx", "SETNODES", nodes,
"2").Err())
t.Run("with replicas", func(t *testing.T) {
- replicas, err := rdb.Do(ctx, "cluster", "replicas",
"159dde1194ebf5bfc5a293dff839c3d1476f2a49").Text()
+ replicas, err := rdb1.Do(ctx, "cluster", "replicas",
"159dde1194ebf5bfc5a293dff839c3d1476f2a49").Text()
require.NoError(t, err)
fields := strings.Split(replicas, " ")
require.Len(t, fields, 8)
- require.Equal(t, fmt.Sprintf("%s@%d", srv.HostPort(),
srv.Port()+10000), fields[1])
+ require.Equal(t, fmt.Sprintf("%s@%d", srv2.HostPort(),
srv2.Port()+10000), fields[1])
require.Equal(t, "slave", fields[2])
require.Equal(t, master2ID, fields[3])
require.Equal(t, "connected\n", fields[7])
})
t.Run("without replicas", func(t *testing.T) {
- replicas, err := rdb.Do(ctx, "cluster", "replicas",
"bb2e5b3c5282086df51eff6b3e35519aede96fa6").Text()
+ replicas, err := rdb1.Do(ctx, "cluster", "replicas",
"bb2e5b3c5282086df51eff6b3e35519aede96fa6").Text()
require.NoError(t, err)
require.Empty(t, replicas)
})
t.Run("send command to replica", func(t *testing.T) {
- err := rdb.Do(ctx, "cluster", "replicas",
"7dbee3d628f04cc5d763b36e92b10533e627a1d0").Err()
+ err := rdb1.Do(ctx, "cluster", "replicas",
"7dbee3d628f04cc5d763b36e92b10533e627a1d0").Err()
require.Error(t, err)
require.Contains(t, err.Error(), "The node isn't a master")
})
t.Run("unknown node", func(t *testing.T) {
- err := rdb.Do(ctx, "cluster", "replicas", "unknown").Err()
+ err := rdb1.Do(ctx, "cluster", "replicas", "unknown").Err()
require.Error(t, err)
require.Contains(t, err.Error(), "Invalid cluster node id")
})
+
+ t.Run("remove the replication if the node is not in the cluster",
func(t *testing.T) {
+ require.Equal(t, "slave", util.FindInfoEntry(rdb2, "role"))
+ // remove the cluster replica node
+ clusterNode := fmt.Sprintf("%s\n%s", master1Node, master2Node)
+ err := rdb1.Do(ctx, "clusterx", "SETNODES", clusterNode,
"3").Err()
+ require.NoError(t, err)
+ err = rdb2.Do(ctx, "clusterx", "SETNODES", clusterNode,
"3").Err()
+ require.NoError(t, err)
+
+ require.Eventually(t, func() bool {
+ return util.FindInfoEntry(rdb2, "role") == "master"
+ }, 5*time.Second, 100*time.Millisecond)
+ })
}
func TestClusterDumpAndLoadClusterNodesInfo(t *testing.T) {