This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 5c8b6cba Add support of the CLUSTER RESET command (#2210)
5c8b6cba is described below
commit 5c8b6cbab40a6d01fea7087fb0220a1cc8caa9e3
Author: hulk <[email protected]>
AuthorDate: Mon Apr 1 11:55:36 2024 +0800
Add support of the CLUSTER RESET command (#2210)
---
src/cluster/cluster.cc | 30 ++++++++-
src/cluster/cluster.h | 1 +
src/commands/cmd_cluster.cc | 12 +++-
src/storage/storage.cc | 13 ++++
src/storage/storage.h | 1 +
tests/gocase/integration/cluster/cluster_test.go | 80 ++++++++++++++++++++++++
6 files changed, 134 insertions(+), 3 deletions(-)
diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index 686b63a0..8e16270c 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -53,7 +53,7 @@ Cluster::Cluster(Server *srv, std::vector<std::string> binds,
int port)
// cluster data, so these commands should be executed exclusively, and
ReadWriteLock
// also can guarantee accessing data is safe.
bool Cluster::SubCommandIsExecExclusive(const std::string &subcommand) {
- for (auto v : {"setnodes", "setnodeid", "setslot", "import"}) {
+ for (auto v : {"setnodes", "setnodeid", "setslot", "import", "reset"}) {
if (util::EqualICase(v, subcommand)) return true;
}
return false;
@@ -828,3 +828,31 @@ Status Cluster::CanExecByMySelf(const
redis::CommandAttributes *attributes, cons
return {Status::RedisExecErr,
fmt::format("MOVED {} {}:{}", slot, slots_nodes_[slot]->host,
slots_nodes_[slot]->port)};
}
+
+Status Cluster::Reset() {
+ if (srv_->slot_migrator && srv_->slot_migrator->GetMigratingSlot() != -1) {
+ return {Status::NotOK, "Can't reset cluster while migrating slot"};
+ }
+ if (srv_->slot_import && srv_->slot_import->GetSlot() != -1) {
+ return {Status::NotOK, "Can't reset cluster while importing slot"};
+ }
+ if (!srv_->storage->IsEmptyDB()) {
+ return {Status::NotOK, "Can't reset cluster while database is not empty"};
+ }
+
+ version_ = -1;
+ size_ = 0;
+ myid_.clear();
+ myself_.reset();
+
+ nodes_.clear();
+ for (auto &n : slots_nodes_) {
+ n = nullptr;
+ }
+ migrated_slots_.clear();
+ imported_slots_.clear();
+
+ // unlink the cluster nodes file if exists
+ unlink(srv_->GetConfig()->NodesFilePath().data());
+ return Status::OK();
+}
diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h
index 0afc832e..79e0d38e 100644
--- a/src/cluster/cluster.h
+++ b/src/cluster/cluster.h
@@ -89,6 +89,7 @@ class Cluster {
std::string GetMyId() const { return myid_; }
Status DumpClusterNodes(const std::string &file);
Status LoadClusterNodes(const std::string &file_path);
+ Status Reset();
static bool SubCommandIsExecExclusive(const std::string &subcommand);
diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc
index bd8de277..9567dcd5 100644
--- a/src/commands/cmd_cluster.cc
+++ b/src/commands/cmd_cluster.cc
@@ -32,7 +32,8 @@ class CommandCluster : public Commander {
Status Parse(const std::vector<std::string> &args) override {
subcommand_ = util::ToLower(args[1]);
- if (args.size() == 2 && (subcommand_ == "nodes" || subcommand_ == "slots"
|| subcommand_ == "info"))
+ if (args.size() == 2 &&
+ (subcommand_ == "nodes" || subcommand_ == "slots" || subcommand_ ==
"info" || subcommand_ == "reset"))
return Status::OK();
if (subcommand_ == "keyslot" && args_.size() == 3) return Status::OK();
@@ -48,7 +49,7 @@ class CommandCluster : public Commander {
return Status::OK();
}
- return {Status::RedisParseErr, "CLUSTER command, CLUSTER
INFO|NODES|SLOTS|KEYSLOT"};
+ return {Status::RedisParseErr, "CLUSTER command, CLUSTER
INFO|NODES|SLOTS|KEYSLOT|RESET"};
}
Status Execute(Server *srv, Connection *conn, std::string *output) override {
@@ -105,6 +106,13 @@ class CommandCluster : public Commander {
} else {
return {Status::RedisExecErr, s.Msg()};
}
+ } else if (subcommand_ == "reset") {
+ Status s = srv->cluster->Reset();
+ if (s.IsOK()) {
+ *output = redis::SimpleString("OK");
+ } else {
+ return {Status::RedisExecErr, s.Msg()};
+ }
} else {
return {Status::RedisExecErr, "Invalid cluster command options"};
}
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index c21b8469..3074664e 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -521,6 +521,19 @@ Status Storage::RestoreFromCheckpoint() {
return Status::OK();
}
+bool Storage::IsEmptyDB() {
+ std::unique_ptr<rocksdb::Iterator> iter(
+ db_->NewIterator(rocksdb::ReadOptions(),
GetCFHandle(kMetadataColumnFamilyName)));
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ Metadata metadata(kRedisNone, false);
+ // If cannot decode the metadata we think the key is alive, so the db is
not empty
+ if (!metadata.Decode(iter->value()).ok() || !metadata.Expired()) {
+ return false;
+ }
+ }
+ return true;
+}
+
void Storage::EmptyDB() {
// Clean old backups and checkpoints
PurgeOldBackups(0, 0);
diff --git a/src/storage/storage.h b/src/storage/storage.h
index 3627ad64..44888f1c 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -130,6 +130,7 @@ class Storage {
void SetWriteOptions(const Config::RocksDB::WriteOptions &config);
Status Open(DBOpenMode mode = kDBOpenModeDefault);
void CloseDB();
+ bool IsEmptyDB();
void EmptyDB();
rocksdb::BlockBasedTableOptions InitTableOptions();
void SetBlobDB(rocksdb::ColumnFamilyOptions *cf_options);
diff --git a/tests/gocase/integration/cluster/cluster_test.go
b/tests/gocase/integration/cluster/cluster_test.go
index 97c8ed88..3c4e27fe 100644
--- a/tests/gocase/integration/cluster/cluster_test.go
+++ b/tests/gocase/integration/cluster/cluster_test.go
@@ -24,6 +24,7 @@ import (
"fmt"
"strings"
"testing"
+ "time"
"github.com/apache/kvrocks/tests/gocase/util"
"github.com/redis/go-redis/v9"
@@ -398,3 +399,82 @@ func TestClusterMultiple(t *testing.T) {
util.ErrorRegexp(t, rdb[3].Get(ctx,
util.SlotTable[8192]).Err(), fmt.Sprintf("MOVED 8192.*%d.*", srv[2].Port()))
})
}
+
+func TestClusterReset(t *testing.T) {
+ ctx := context.Background()
+
+ srv0 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ defer func() { srv0.Close() }()
+ rdb0 := srv0.NewClientWithOption(&redis.Options{PoolSize: 1})
+ defer func() { require.NoError(t, rdb0.Close()) }()
+ id0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODEID", id0).Err())
+
+ srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ defer func() { srv1.Close() }()
+ rdb1 := srv1.NewClientWithOption(&redis.Options{PoolSize: 1})
+ defer func() { require.NoError(t, rdb1.Close()) }()
+ id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())
+
+ clusterNodes := fmt.Sprintf("%s %s %d master - 0-8191\n", id0,
srv0.Host(), srv0.Port())
+ clusterNodes += fmt.Sprintf("%s %s %d master - 8192-16383", id1,
srv1.Host(), srv1.Port())
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes,
"1").Err())
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes,
"1").Err())
+
+ t.Run("cannot reset cluster if the db is not empty", func(t *testing.T)
{
+ key := util.SlotTable[0]
+ require.NoError(t, rdb0.Set(ctx, key, "value", 0).Err())
+ require.Contains(t, rdb0.Do(ctx, "cluster", "reset").Err(),
"Can't reset cluster while database is not empty")
+ require.NoError(t, rdb0.Del(ctx, key).Err())
+ require.NoError(t, rdb0.Do(ctx, "cluster", "reset").Err())
+ require.EqualValues(t, "-1", rdb0.Do(ctx, "clusterx",
"version").Val())
+ // reset the cluster topology to avoid breaking other test cases
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES",
clusterNodes, "1").Err())
+ })
+
+ t.Run("cannot reset cluster if the db is importing the slot", func(t
*testing.T) {
+ slotNum := 1
+ require.Equal(t, "OK", rdb1.Do(ctx, "cluster", "import",
slotNum, 0).Val())
+ clusterInfo := rdb1.ClusterInfo(ctx).Val()
+ require.Contains(t, clusterInfo, "importing_slot: 1")
+ require.Contains(t, clusterInfo, "import_state: start")
+ require.Contains(t, rdb1.Do(ctx, "cluster", "reset").Err(),
"Can't reset cluster while importing slot")
+ require.Equal(t, "OK", rdb1.Do(ctx, "cluster", "import",
slotNum, 1).Val())
+ clusterInfo = rdb1.ClusterInfo(ctx).Val()
+ require.Contains(t, clusterInfo, "import_state: success")
+ require.NoError(t, rdb0.Do(ctx, "cluster", "reset").Err())
+ require.EqualValues(t, "-1", rdb0.Do(ctx, "clusterx",
"version").Val())
+ // reset the cluster topology to avoid breaking other test cases
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES",
clusterNodes, "1").Err())
+ })
+
+ t.Run("cannot reset cluster if the db is migrating the slot", func(t
*testing.T) {
+ slotNum := 2
+ // slow down the migration speed to avoid breaking other test
cases
+ require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed",
"128").Err())
+ for i := 0; i < 1024; i++ {
+ require.NoError(t, rdb0.RPush(ctx, "my-list",
fmt.Sprintf("element%d", i)).Err())
+ }
+
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate",
slotNum, id1).Val())
+ clusterInfo := rdb0.ClusterInfo(ctx).Val()
+ require.Contains(t, clusterInfo, "migrating_slot: 2")
+ require.Contains(t, clusterInfo, "migrating_state: start")
+ require.Contains(t, rdb0.Do(ctx, "cluster", "reset").Err(),
"Can't reset cluster while migrating slot")
+
+ // wait for the migration to finish
+ require.Eventually(t, func() bool {
+ clusterInfo :=
rdb0.ClusterInfo(context.Background()).Val()
+ return strings.Contains(clusterInfo,
fmt.Sprintf("migrating_state: %s", "success"))
+ }, 10*time.Second, 100*time.Millisecond)
+ // Need to flush keys in the source node since the success
migration will not mean
+ // the keys are removed from the source node right now.
+ require.NoError(t, rdb0.FlushAll(ctx).Err())
+
+ require.NoError(t, rdb0.Do(ctx, "cluster", "reset").Err())
+ require.EqualValues(t, "-1", rdb0.Do(ctx, "clusterx",
"version").Val())
+ // reset the cluster topology to avoid breaking other test cases
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES",
clusterNodes, "1").Err())
+ })
+}