This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 5c8b6cba Add support of the CLUSTER RESET command (#2210)
5c8b6cba is described below

commit 5c8b6cbab40a6d01fea7087fb0220a1cc8caa9e3
Author: hulk <[email protected]>
AuthorDate: Mon Apr 1 11:55:36 2024 +0800

    Add support of the CLUSTER RESET command (#2210)
---
 src/cluster/cluster.cc                           | 30 ++++++++-
 src/cluster/cluster.h                            |  1 +
 src/commands/cmd_cluster.cc                      | 12 +++-
 src/storage/storage.cc                           | 13 ++++
 src/storage/storage.h                            |  1 +
 tests/gocase/integration/cluster/cluster_test.go | 80 ++++++++++++++++++++++++
 6 files changed, 134 insertions(+), 3 deletions(-)

diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index 686b63a0..8e16270c 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -53,7 +53,7 @@ Cluster::Cluster(Server *srv, std::vector<std::string> binds, 
int port)
 // cluster data, so these commands should be executed exclusively, and 
ReadWriteLock
 // also can guarantee accessing data is safe.
 bool Cluster::SubCommandIsExecExclusive(const std::string &subcommand) {
-  for (auto v : {"setnodes", "setnodeid", "setslot", "import"}) {
+  for (auto v : {"setnodes", "setnodeid", "setslot", "import", "reset"}) {
     if (util::EqualICase(v, subcommand)) return true;
   }
   return false;
@@ -828,3 +828,31 @@ Status Cluster::CanExecByMySelf(const 
redis::CommandAttributes *attributes, cons
   return {Status::RedisExecErr,
           fmt::format("MOVED {} {}:{}", slot, slots_nodes_[slot]->host, 
slots_nodes_[slot]->port)};
 }
+
+Status Cluster::Reset() {
+  if (srv_->slot_migrator && srv_->slot_migrator->GetMigratingSlot() != -1) {
+    return {Status::NotOK, "Can't reset cluster while migrating slot"};
+  }
+  if (srv_->slot_import && srv_->slot_import->GetSlot() != -1) {
+    return {Status::NotOK, "Can't reset cluster while importing slot"};
+  }
+  if (!srv_->storage->IsEmptyDB()) {
+    return {Status::NotOK, "Can't reset cluster while database is not empty"};
+  }
+
+  version_ = -1;
+  size_ = 0;
+  myid_.clear();
+  myself_.reset();
+
+  nodes_.clear();
+  for (auto &n : slots_nodes_) {
+    n = nullptr;
+  }
+  migrated_slots_.clear();
+  imported_slots_.clear();
+
+  // unlink the cluster nodes file if exists
+  unlink(srv_->GetConfig()->NodesFilePath().data());
+  return Status::OK();
+}
diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h
index 0afc832e..79e0d38e 100644
--- a/src/cluster/cluster.h
+++ b/src/cluster/cluster.h
@@ -89,6 +89,7 @@ class Cluster {
   std::string GetMyId() const { return myid_; }
   Status DumpClusterNodes(const std::string &file);
   Status LoadClusterNodes(const std::string &file_path);
+  Status Reset();
 
   static bool SubCommandIsExecExclusive(const std::string &subcommand);
 
diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc
index bd8de277..9567dcd5 100644
--- a/src/commands/cmd_cluster.cc
+++ b/src/commands/cmd_cluster.cc
@@ -32,7 +32,8 @@ class CommandCluster : public Commander {
   Status Parse(const std::vector<std::string> &args) override {
     subcommand_ = util::ToLower(args[1]);
 
-    if (args.size() == 2 && (subcommand_ == "nodes" || subcommand_ == "slots" 
|| subcommand_ == "info"))
+    if (args.size() == 2 &&
+        (subcommand_ == "nodes" || subcommand_ == "slots" || subcommand_ == 
"info" || subcommand_ == "reset"))
       return Status::OK();
 
     if (subcommand_ == "keyslot" && args_.size() == 3) return Status::OK();
@@ -48,7 +49,7 @@ class CommandCluster : public Commander {
       return Status::OK();
     }
 
-    return {Status::RedisParseErr, "CLUSTER command, CLUSTER 
INFO|NODES|SLOTS|KEYSLOT"};
+    return {Status::RedisParseErr, "CLUSTER command, CLUSTER 
INFO|NODES|SLOTS|KEYSLOT|RESET"};
   }
 
   Status Execute(Server *srv, Connection *conn, std::string *output) override {
@@ -105,6 +106,13 @@ class CommandCluster : public Commander {
       } else {
         return {Status::RedisExecErr, s.Msg()};
       }
+    } else if (subcommand_ == "reset") {
+      Status s = srv->cluster->Reset();
+      if (s.IsOK()) {
+        *output = redis::SimpleString("OK");
+      } else {
+        return {Status::RedisExecErr, s.Msg()};
+      }
     } else {
       return {Status::RedisExecErr, "Invalid cluster command options"};
     }
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index c21b8469..3074664e 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -521,6 +521,19 @@ Status Storage::RestoreFromCheckpoint() {
   return Status::OK();
 }
 
+bool Storage::IsEmptyDB() {
+  std::unique_ptr<rocksdb::Iterator> iter(
+      db_->NewIterator(rocksdb::ReadOptions(), 
GetCFHandle(kMetadataColumnFamilyName)));
+  for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+    Metadata metadata(kRedisNone, false);
+    // If cannot decode the metadata we think the key is alive, so the db is 
not empty
+    if (!metadata.Decode(iter->value()).ok() || !metadata.Expired()) {
+      return false;
+    }
+  }
+  return true;
+}
+
 void Storage::EmptyDB() {
   // Clean old backups and checkpoints
   PurgeOldBackups(0, 0);
diff --git a/src/storage/storage.h b/src/storage/storage.h
index 3627ad64..44888f1c 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -130,6 +130,7 @@ class Storage {
   void SetWriteOptions(const Config::RocksDB::WriteOptions &config);
   Status Open(DBOpenMode mode = kDBOpenModeDefault);
   void CloseDB();
+  bool IsEmptyDB();
   void EmptyDB();
   rocksdb::BlockBasedTableOptions InitTableOptions();
   void SetBlobDB(rocksdb::ColumnFamilyOptions *cf_options);
diff --git a/tests/gocase/integration/cluster/cluster_test.go 
b/tests/gocase/integration/cluster/cluster_test.go
index 97c8ed88..3c4e27fe 100644
--- a/tests/gocase/integration/cluster/cluster_test.go
+++ b/tests/gocase/integration/cluster/cluster_test.go
@@ -24,6 +24,7 @@ import (
        "fmt"
        "strings"
        "testing"
+       "time"
 
        "github.com/apache/kvrocks/tests/gocase/util"
        "github.com/redis/go-redis/v9"
@@ -398,3 +399,82 @@ func TestClusterMultiple(t *testing.T) {
                util.ErrorRegexp(t, rdb[3].Get(ctx, 
util.SlotTable[8192]).Err(), fmt.Sprintf("MOVED 8192.*%d.*", srv[2].Port()))
        })
 }
+
+func TestClusterReset(t *testing.T) {
+       ctx := context.Background()
+
+       srv0 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+       defer func() { srv0.Close() }()
+       rdb0 := srv0.NewClientWithOption(&redis.Options{PoolSize: 1})
+       defer func() { require.NoError(t, rdb0.Close()) }()
+       id0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODEID", id0).Err())
+
+       srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+       defer func() { srv1.Close() }()
+       rdb1 := srv1.NewClientWithOption(&redis.Options{PoolSize: 1})
+       defer func() { require.NoError(t, rdb1.Close()) }()
+       id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())
+
+       clusterNodes := fmt.Sprintf("%s %s %d master - 0-8191\n", id0, 
srv0.Host(), srv0.Port())
+       clusterNodes += fmt.Sprintf("%s %s %d master - 8192-16383", id1, 
srv1.Host(), srv1.Port())
+       require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, 
"1").Err())
+       require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, 
"1").Err())
+
+       t.Run("cannot reset cluster if the db is not empty", func(t *testing.T) 
{
+               key := util.SlotTable[0]
+               require.NoError(t, rdb0.Set(ctx, key, "value", 0).Err())
+               require.Contains(t, rdb0.Do(ctx, "cluster", "reset").Err(), 
"Can't reset cluster while database is not empty")
+               require.NoError(t, rdb0.Del(ctx, key).Err())
+               require.NoError(t, rdb0.Do(ctx, "cluster", "reset").Err())
+               require.EqualValues(t, "-1", rdb0.Do(ctx, "clusterx", 
"version").Val())
+               // reset the cluster topology to avoid breaking other test cases
+               require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+       })
+
+       t.Run("cannot reset cluster if the db is importing the slot", func(t 
*testing.T) {
+               slotNum := 1
+               require.Equal(t, "OK", rdb1.Do(ctx, "cluster", "import", 
slotNum, 0).Val())
+               clusterInfo := rdb1.ClusterInfo(ctx).Val()
+               require.Contains(t, clusterInfo, "importing_slot: 1")
+               require.Contains(t, clusterInfo, "import_state: start")
+               require.Contains(t, rdb1.Do(ctx, "cluster", "reset").Err(), 
"Can't reset cluster while importing slot")
+               require.Equal(t, "OK", rdb1.Do(ctx, "cluster", "import", 
slotNum, 1).Val())
+               clusterInfo = rdb1.ClusterInfo(ctx).Val()
+               require.Contains(t, clusterInfo, "import_state: success")
+               require.NoError(t, rdb0.Do(ctx, "cluster", "reset").Err())
+               require.EqualValues(t, "-1", rdb0.Do(ctx, "clusterx", 
"version").Val())
+               // reset the cluster topology to avoid breaking other test cases
+               require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+       })
+
+       t.Run("cannot reset cluster if the db is migrating the slot", func(t 
*testing.T) {
+               slotNum := 2
+               // slow down the migration speed to avoid breaking other test 
cases
+               require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", 
"128").Err())
+               for i := 0; i < 1024; i++ {
+                       require.NoError(t, rdb0.RPush(ctx, "my-list", 
fmt.Sprintf("element%d", i)).Err())
+               }
+
+               require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", 
slotNum, id1).Val())
+               clusterInfo := rdb0.ClusterInfo(ctx).Val()
+               require.Contains(t, clusterInfo, "migrating_slot: 2")
+               require.Contains(t, clusterInfo, "migrating_state: start")
+               require.Contains(t, rdb0.Do(ctx, "cluster", "reset").Err(), 
"Can't reset cluster while migrating slot")
+
+               // wait for the migration to finish
+               require.Eventually(t, func() bool {
+                       clusterInfo := 
rdb0.ClusterInfo(context.Background()).Val()
+                       return strings.Contains(clusterInfo, 
fmt.Sprintf("migrating_state: %s", "success"))
+               }, 10*time.Second, 100*time.Millisecond)
+               // Need to flush keys in the source node since the success 
migration will not mean
+               // the keys are removed from the source node right now.
+               require.NoError(t, rdb0.FlushAll(ctx).Err())
+
+               require.NoError(t, rdb0.Do(ctx, "cluster", "reset").Err())
+               require.EqualValues(t, "-1", rdb0.Do(ctx, "clusterx", 
"version").Val())
+               // reset the cluster topology to avoid breaking other test cases
+               require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+       })
+}

Reply via email to