This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 3dc16f34 Fix the replica should remove the master link when receiving 
the CLUSTER RESET command (#2259)
3dc16f34 is described below

commit 3dc16f3459ed6602824135fdfc6ee56ca0af5a3e
Author: hulk <[email protected]>
AuthorDate: Sat Apr 20 23:38:04 2024 +0800

    Fix the replica should remove the master link when receiving the CLUSTER 
RESET command (#2259)
    
    This PR also allows using SOFT|HARD in the CLUSTER RESET command,
    but it has the same meaning in the Kvrocks cluster since keeping
    the cluster's current epoch(version) is meaningless to it.
---
 src/cluster/cluster.cc                           |  6 +++++
 src/commands/cmd_cluster.cc                      | 11 ++++++--
 tests/gocase/integration/cluster/cluster_test.go | 32 ++++++++++++++++++------
 3 files changed, 40 insertions(+), 9 deletions(-)

diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index ed393078..1244ea4a 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -871,6 +871,8 @@ Status Cluster::CanExecByMySelf(const 
redis::CommandAttributes *attributes, cons
           fmt::format("MOVED {} {}:{}", slot, slots_nodes_[slot]->host, 
slots_nodes_[slot]->port)};
 }
 
+// Only HARD mode is meaningful to the Kvrocks cluster,
+// so it will force clearing all information after resetting.
 Status Cluster::Reset() {
   if (srv_->slot_migrator && srv_->slot_migrator->GetMigratingSlot() != -1) {
     return {Status::NotOK, "Can't reset cluster while migrating slot"};
@@ -881,6 +883,10 @@ Status Cluster::Reset() {
   if (!srv_->storage->IsEmptyDB()) {
     return {Status::NotOK, "Can't reset cluster while database is not empty"};
   }
+  if (srv_->IsSlave()) {
+    auto s = srv_->RemoveMaster();
+    if (!s.IsOK()) return s;
+  }
 
   version_ = -1;
   size_ = 0;
diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc
index df0c45ef..e371e78f 100644
--- a/src/commands/cmd_cluster.cc
+++ b/src/commands/cmd_cluster.cc
@@ -32,10 +32,17 @@ class CommandCluster : public Commander {
   Status Parse(const std::vector<std::string> &args) override {
     subcommand_ = util::ToLower(args[1]);
 
-    if (args.size() == 2 &&
-        (subcommand_ == "nodes" || subcommand_ == "slots" || subcommand_ == 
"info" || subcommand_ == "reset"))
+    if (args.size() == 2 && (subcommand_ == "nodes" || subcommand_ == "slots" 
|| subcommand_ == "info"))
       return Status::OK();
 
+    // CLUSTER RESET [HARD|SOFT]
+    if (subcommand_ == "reset" && (args_.size() == 2 || args_.size() == 3)) {
+      if (args_.size() == 3 && !util::EqualICase(args_[2], "hard") && 
!util::EqualICase(args_[2], "soft")) {
+        return {Status::RedisParseErr, errInvalidSyntax};
+      }
+      return Status::OK();
+    }
+
     if (subcommand_ == "keyslot" && args_.size() == 3) return Status::OK();
 
     if (subcommand_ == "import") {
diff --git a/tests/gocase/integration/cluster/cluster_test.go 
b/tests/gocase/integration/cluster/cluster_test.go
index a26dc74a..fad7d012 100644
--- a/tests/gocase/integration/cluster/cluster_test.go
+++ b/tests/gocase/integration/cluster/cluster_test.go
@@ -473,33 +473,51 @@ func TestClusterReset(t *testing.T) {
        id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
        require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())
 
+       srv2 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+       defer func() { srv2.Close() }()
+       rdb2 := srv2.NewClientWithOption(&redis.Options{PoolSize: 1})
+       defer func() { require.NoError(t, rdb2.Close()) }()
+       id2 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx02"
+       require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())
+
        clusterNodes := fmt.Sprintf("%s %s %d master - 0-8191\n", id0, 
srv0.Host(), srv0.Port())
-       clusterNodes += fmt.Sprintf("%s %s %d master - 8192-16383", id1, 
srv1.Host(), srv1.Port())
+       clusterNodes += fmt.Sprintf("%s %s %d master - 8192-16383\n", id1, 
srv1.Host(), srv1.Port())
+       clusterNodes += fmt.Sprintf("%s %s %d slave %s", id2, srv2.Host(), 
srv2.Port(), id1)
        require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, 
"1").Err())
        require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, 
"1").Err())
+       require.NoError(t, rdb2.Do(ctx, "clusterx", "SETNODES", clusterNodes, 
"1").Err())
 
        t.Run("cannot reset cluster if the db is not empty", func(t *testing.T) 
{
                key := util.SlotTable[0]
                require.NoError(t, rdb0.Set(ctx, key, "value", 0).Err())
-               require.Contains(t, rdb0.Do(ctx, "cluster", "reset").Err(), 
"Can't reset cluster while database is not empty")
+               require.Contains(t, rdb0.ClusterResetHard(ctx).Err(), "Can't 
reset cluster while database is not empty")
                require.NoError(t, rdb0.Del(ctx, key).Err())
-               require.NoError(t, rdb0.Do(ctx, "cluster", "reset").Err())
+               require.NoError(t, rdb0.ClusterResetSoft(ctx).Err())
                require.EqualValues(t, "-1", rdb0.Do(ctx, "clusterx", 
"version").Val())
                // reset the cluster topology to avoid breaking other test cases
                require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
        })
 
+       t.Run("replica should become master after reset", func(t *testing.T) {
+               require.Eventually(t, func() bool {
+                       return util.FindInfoEntry(rdb2, "role") == "slave"
+               }, 5*time.Second, 50*time.Millisecond)
+               require.NoError(t, rdb2.ClusterResetHard(ctx).Err())
+               require.Equal(t, "master", util.FindInfoEntry(rdb2, "role"))
+               require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+       })
+
        t.Run("cannot reset cluster if the db is importing the slot", func(t 
*testing.T) {
                slotNum := 1
                require.Equal(t, "OK", rdb1.Do(ctx, "cluster", "import", 
slotNum, 0).Val())
                clusterInfo := rdb1.ClusterInfo(ctx).Val()
                require.Contains(t, clusterInfo, "importing_slot: 1")
                require.Contains(t, clusterInfo, "import_state: start")
-               require.Contains(t, rdb1.Do(ctx, "cluster", "reset").Err(), 
"Can't reset cluster while importing slot")
+               require.Contains(t, rdb1.ClusterResetHard(ctx).Err(), "Can't 
reset cluster while importing slot")
                require.Equal(t, "OK", rdb1.Do(ctx, "cluster", "import", 
slotNum, 1).Val())
                clusterInfo = rdb1.ClusterInfo(ctx).Val()
                require.Contains(t, clusterInfo, "import_state: success")
-               require.NoError(t, rdb0.Do(ctx, "cluster", "reset").Err())
+               require.NoError(t, rdb0.ClusterResetHard(ctx).Err())
                require.EqualValues(t, "-1", rdb0.Do(ctx, "clusterx", 
"version").Val())
                // reset the cluster topology to avoid breaking other test cases
                require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
@@ -517,7 +535,7 @@ func TestClusterReset(t *testing.T) {
                clusterInfo := rdb0.ClusterInfo(ctx).Val()
                require.Contains(t, clusterInfo, "migrating_slot: 2")
                require.Contains(t, clusterInfo, "migrating_state: start")
-               require.Contains(t, rdb0.Do(ctx, "cluster", "reset").Err(), 
"Can't reset cluster while migrating slot")
+               require.Contains(t, rdb0.ClusterResetHard(ctx).Err(), "Can't 
reset cluster while migrating slot")
 
                // wait for the migration to finish
                require.Eventually(t, func() bool {
@@ -528,7 +546,7 @@ func TestClusterReset(t *testing.T) {
                // the keys are removed from the source node right now.
                require.NoError(t, rdb0.FlushAll(ctx).Err())
 
-               require.NoError(t, rdb0.Do(ctx, "cluster", "reset").Err())
+               require.NoError(t, rdb0.ClusterResetHard(ctx).Err())
                require.EqualValues(t, "-1", rdb0.Do(ctx, "clusterx", 
"version").Val())
                // reset the cluster topology to avoid breaking other test cases
                require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())

Reply via email to