This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 3dc16f34 Fix the replica should remove the master link when receiving
the CLUSTER RESET command (#2259)
3dc16f34 is described below
commit 3dc16f3459ed6602824135fdfc6ee56ca0af5a3e
Author: hulk <[email protected]>
AuthorDate: Sat Apr 20 23:38:04 2024 +0800
Fix the replica should remove the master link when receiving the CLUSTER
RESET command (#2259)
This PR also allows using SOFT|HARD in the CLUSTER RESET command,
but it has the same meaning in the Kvrocks cluster since keeping
the cluster's current epoch(version) is meaningless to it.
---
src/cluster/cluster.cc | 6 +++++
src/commands/cmd_cluster.cc | 11 ++++++--
tests/gocase/integration/cluster/cluster_test.go | 32 ++++++++++++++++++------
3 files changed, 40 insertions(+), 9 deletions(-)
diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index ed393078..1244ea4a 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -871,6 +871,8 @@ Status Cluster::CanExecByMySelf(const
redis::CommandAttributes *attributes, cons
fmt::format("MOVED {} {}:{}", slot, slots_nodes_[slot]->host,
slots_nodes_[slot]->port)};
}
+// Only HARD mode is meaningful to the Kvrocks cluster,
+// so it will force clearing all information after resetting.
Status Cluster::Reset() {
if (srv_->slot_migrator && srv_->slot_migrator->GetMigratingSlot() != -1) {
return {Status::NotOK, "Can't reset cluster while migrating slot"};
@@ -881,6 +883,10 @@ Status Cluster::Reset() {
if (!srv_->storage->IsEmptyDB()) {
return {Status::NotOK, "Can't reset cluster while database is not empty"};
}
+ if (srv_->IsSlave()) {
+ auto s = srv_->RemoveMaster();
+ if (!s.IsOK()) return s;
+ }
version_ = -1;
size_ = 0;
diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc
index df0c45ef..e371e78f 100644
--- a/src/commands/cmd_cluster.cc
+++ b/src/commands/cmd_cluster.cc
@@ -32,10 +32,17 @@ class CommandCluster : public Commander {
Status Parse(const std::vector<std::string> &args) override {
subcommand_ = util::ToLower(args[1]);
- if (args.size() == 2 &&
- (subcommand_ == "nodes" || subcommand_ == "slots" || subcommand_ ==
"info" || subcommand_ == "reset"))
+ if (args.size() == 2 && (subcommand_ == "nodes" || subcommand_ == "slots"
|| subcommand_ == "info"))
return Status::OK();
+ // CLUSTER RESET [HARD|SOFT]
+ if (subcommand_ == "reset" && (args_.size() == 2 || args_.size() == 3)) {
+ if (args_.size() == 3 && !util::EqualICase(args_[2], "hard") &&
!util::EqualICase(args_[2], "soft")) {
+ return {Status::RedisParseErr, errInvalidSyntax};
+ }
+ return Status::OK();
+ }
+
if (subcommand_ == "keyslot" && args_.size() == 3) return Status::OK();
if (subcommand_ == "import") {
diff --git a/tests/gocase/integration/cluster/cluster_test.go
b/tests/gocase/integration/cluster/cluster_test.go
index a26dc74a..fad7d012 100644
--- a/tests/gocase/integration/cluster/cluster_test.go
+++ b/tests/gocase/integration/cluster/cluster_test.go
@@ -473,33 +473,51 @@ func TestClusterReset(t *testing.T) {
id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())
+ srv2 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ defer func() { srv2.Close() }()
+ rdb2 := srv2.NewClientWithOption(&redis.Options{PoolSize: 1})
+ defer func() { require.NoError(t, rdb2.Close()) }()
+ id2 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx02"
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())
+
clusterNodes := fmt.Sprintf("%s %s %d master - 0-8191\n", id0,
srv0.Host(), srv0.Port())
- clusterNodes += fmt.Sprintf("%s %s %d master - 8192-16383", id1,
srv1.Host(), srv1.Port())
+ clusterNodes += fmt.Sprintf("%s %s %d master - 8192-16383\n", id1,
srv1.Host(), srv1.Port())
+ clusterNodes += fmt.Sprintf("%s %s %d slave %s", id2, srv2.Host(),
srv2.Port(), id1)
require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes,
"1").Err())
require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes,
"1").Err())
+ require.NoError(t, rdb2.Do(ctx, "clusterx", "SETNODES", clusterNodes,
"1").Err())
t.Run("cannot reset cluster if the db is not empty", func(t *testing.T)
{
key := util.SlotTable[0]
require.NoError(t, rdb0.Set(ctx, key, "value", 0).Err())
- require.Contains(t, rdb0.Do(ctx, "cluster", "reset").Err(),
"Can't reset cluster while database is not empty")
+ require.Contains(t, rdb0.ClusterResetHard(ctx).Err(), "Can't
reset cluster while database is not empty")
require.NoError(t, rdb0.Del(ctx, key).Err())
- require.NoError(t, rdb0.Do(ctx, "cluster", "reset").Err())
+ require.NoError(t, rdb0.ClusterResetSoft(ctx).Err())
require.EqualValues(t, "-1", rdb0.Do(ctx, "clusterx",
"version").Val())
// reset the cluster topology to avoid breaking other test cases
require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES",
clusterNodes, "1").Err())
})
+ t.Run("replica should become master after reset", func(t *testing.T) {
+ require.Eventually(t, func() bool {
+ return util.FindInfoEntry(rdb2, "role") == "slave"
+ }, 5*time.Second, 50*time.Millisecond)
+ require.NoError(t, rdb2.ClusterResetHard(ctx).Err())
+ require.Equal(t, "master", util.FindInfoEntry(rdb2, "role"))
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES",
clusterNodes, "1").Err())
+ })
+
t.Run("cannot reset cluster if the db is importing the slot", func(t
*testing.T) {
slotNum := 1
require.Equal(t, "OK", rdb1.Do(ctx, "cluster", "import",
slotNum, 0).Val())
clusterInfo := rdb1.ClusterInfo(ctx).Val()
require.Contains(t, clusterInfo, "importing_slot: 1")
require.Contains(t, clusterInfo, "import_state: start")
- require.Contains(t, rdb1.Do(ctx, "cluster", "reset").Err(),
"Can't reset cluster while importing slot")
+ require.Contains(t, rdb1.ClusterResetHard(ctx).Err(), "Can't
reset cluster while importing slot")
require.Equal(t, "OK", rdb1.Do(ctx, "cluster", "import",
slotNum, 1).Val())
clusterInfo = rdb1.ClusterInfo(ctx).Val()
require.Contains(t, clusterInfo, "import_state: success")
- require.NoError(t, rdb0.Do(ctx, "cluster", "reset").Err())
+ require.NoError(t, rdb0.ClusterResetHard(ctx).Err())
require.EqualValues(t, "-1", rdb0.Do(ctx, "clusterx",
"version").Val())
// reset the cluster topology to avoid breaking other test cases
require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES",
clusterNodes, "1").Err())
@@ -517,7 +535,7 @@ func TestClusterReset(t *testing.T) {
clusterInfo := rdb0.ClusterInfo(ctx).Val()
require.Contains(t, clusterInfo, "migrating_slot: 2")
require.Contains(t, clusterInfo, "migrating_state: start")
- require.Contains(t, rdb0.Do(ctx, "cluster", "reset").Err(),
"Can't reset cluster while migrating slot")
+ require.Contains(t, rdb0.ClusterResetHard(ctx).Err(), "Can't
reset cluster while migrating slot")
// wait for the migration to finish
require.Eventually(t, func() bool {
@@ -528,7 +546,7 @@ func TestClusterReset(t *testing.T) {
// the keys are removed from the source node right now.
require.NoError(t, rdb0.FlushAll(ctx).Err())
- require.NoError(t, rdb0.Do(ctx, "cluster", "reset").Err())
+ require.NoError(t, rdb0.ClusterResetHard(ctx).Err())
require.EqualValues(t, "-1", rdb0.Do(ctx, "clusterx",
"version").Val())
// reset the cluster topology to avoid breaking other test cases
require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES",
clusterNodes, "1").Err())