This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 6aabd805 Add support of the CLUSTER REPLICAS command (#2244)
6aabd805 is described below

commit 6aabd8050a7d2cb26dee78906d448514f79fff26
Author: Hyeonho Kim <[email protected]>
AuthorDate: Sat Apr 13 18:39:10 2024 +0900

    Add support of the CLUSTER REPLICAS command (#2244)
---
 src/cluster/cluster.cc                           | 42 +++++++++++++++++
 src/cluster/cluster.h                            |  1 +
 src/commands/cmd_cluster.cc                      | 12 ++++-
 tests/cppunit/cluster_test.cc                    | 46 +++++++++++++++++++
 tests/gocase/integration/cluster/cluster_test.go | 58 +++++++++++++++++++++++-
 5 files changed, 157 insertions(+), 2 deletions(-)

diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index 8e16270c..72a7f15a 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -469,6 +469,48 @@ Status Cluster::GetClusterNodes(std::string *nodes_str) {
   return Status::OK();
 }
 
+StatusOr<std::string> Cluster::GetReplicas(const std::string &node_id) {
+  if (version_ < 0) {
+    return {Status::ClusterDown, errClusterNoInitialized};
+  }
+
+  auto item = nodes_.find(node_id);
+  if (item == nodes_.end()) {
+    return {Status::InvalidArgument, errInvalidNodeID};
+  }
+
+  auto node = item->second;
+  if (node->role != kClusterMaster) {
+    return {Status::InvalidArgument, errNoMasterNode};
+  }
+
+  auto now = util::GetTimeStampMS();
+  std::string replicas_desc;
+  for (const auto &replica_id : node->replicas) {
+    auto n = nodes_.find(replica_id);
+    if (n == nodes_.end()) {
+      continue;
+    }
+
+    auto replica = n->second;
+
+    std::string node_str;
+    // ID, host, port
+    node_str.append(
+        fmt::format("{} {}:{}@{} ", replica_id, replica->host, replica->port, 
replica->port + kClusterPortIncr));
+
+    // Flags
+    node_str.append(fmt::format("slave {} ", node_id));
+
+    // Ping sent, pong received, config epoch, link status
+    node_str.append(fmt::format("{} {} {} connected", now - 1, now, version_));
+
+    replicas_desc.append(node_str + "\n");
+  }
+
+  return replicas_desc;
+}
+
 std::string Cluster::getNodeIDBySlot(int slot) const {
   if (slot < 0 || slot >= kClusterSlots || !slots_nodes_[slot]) return "";
   return slots_nodes_[slot]->id;
diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h
index 79e0d38e..c98ea668 100644
--- a/src/cluster/cluster.h
+++ b/src/cluster/cluster.h
@@ -71,6 +71,7 @@ class Cluster {
   explicit Cluster(Server *srv, std::vector<std::string> binds, int port);
   Status SetClusterNodes(const std::string &nodes_str, int64_t version, bool 
force);
   Status GetClusterNodes(std::string *nodes_str);
+  StatusOr<std::string> GetReplicas(const std::string &node_id);
   Status SetNodeId(const std::string &node_id);
   Status SetSlotRanges(const std::vector<SlotRange> &slot_ranges, const 
std::string &node_id, int64_t version);
   Status SetSlotMigrated(int slot, const std::string &ip_port);
diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc
index 5831fa50..04382ac1 100644
--- a/src/commands/cmd_cluster.cc
+++ b/src/commands/cmd_cluster.cc
@@ -49,7 +49,9 @@ class CommandCluster : public Commander {
       return Status::OK();
     }
 
-    return {Status::RedisParseErr, "CLUSTER command, CLUSTER 
INFO|NODES|SLOTS|KEYSLOT|RESET"};
+    if (subcommand_ == "replicas" && args_.size() == 3) return Status::OK();
+
+    return {Status::RedisParseErr, "CLUSTER command, CLUSTER 
INFO|NODES|SLOTS|KEYSLOT|RESET|REPLICAS"};
   }
 
   Status Execute(Server *srv, Connection *conn, std::string *output) override {
@@ -113,6 +115,14 @@ class CommandCluster : public Commander {
       } else {
         return {Status::RedisExecErr, s.Msg()};
       }
+    } else if (subcommand_ == "replicas") {
+      auto node_id = args_[2];
+      StatusOr<std::string> s = srv->cluster->GetReplicas(node_id);
+      if (s.IsOK()) {
+        *output = conn->VerbatimString("txt", s.GetValue());
+      } else {
+        return {Status::RedisExecErr, s.Msg()};
+      }
     } else {
       return {Status::RedisExecErr, "Invalid cluster command options"};
     }
diff --git a/tests/cppunit/cluster_test.cc b/tests/cppunit/cluster_test.cc
index 810a1ca1..e70c3136 100644
--- a/tests/cppunit/cluster_test.cc
+++ b/tests/cppunit/cluster_test.cc
@@ -340,3 +340,49 @@ TEST_F(ClusterTest, ClusterParseSlotRanges) {
     slots.clear();
   }
 }
+
+TEST_F(ClusterTest, GetReplicas) {
+  auto config = storage_->GetConfig();
+  // don't start workers
+  config->workers = 0;
+  Server server(storage_.get(), config);
+  // we don't need the server resource, so just stop it once it's started
+  server.Stop();
+  server.Join();
+
+  const std::string nodes =
+      "7dbee3d628f04cc5d763b36e92b10533e627a1d0 127.0.0.1 6480 slave 
159dde1194ebf5bfc5a293dff839c3d1476f2a49\n"
+      "159dde1194ebf5bfc5a293dff839c3d1476f2a49 127.0.0.1 6479 master - 
8192-16383\n"
+      "bb2e5b3c5282086df51eff6b3e35519aede96fa6 127.0.0.1 6379 master - 
0-8191";
+
+  Cluster cluster(&server, {"127.0.0.1"}, 6379);
+  Status s = cluster.SetClusterNodes(nodes, 2, false);
+  ASSERT_TRUE(s.IsOK());
+
+  auto with_replica = 
cluster.GetReplicas("159dde1194ebf5bfc5a293dff839c3d1476f2a49");
+  ASSERT_TRUE(s.IsOK());
+
+  std::vector<std::string> replicas = util::Split(with_replica.GetValue(), 
"\n");
+  for (const auto &replica : replicas) {
+    std::vector<std::string> replica_fields = util::Split(replica, " ");
+
+    ASSERT_TRUE(replica_fields.size() == 8);
+    ASSERT_TRUE(replica_fields[0] == 
"7dbee3d628f04cc5d763b36e92b10533e627a1d0");
+    ASSERT_TRUE(replica_fields[1] == "127.0.0.1:6480@16480");
+    ASSERT_TRUE(replica_fields[2] == "slave");
+    ASSERT_TRUE(replica_fields[3] == 
"159dde1194ebf5bfc5a293dff839c3d1476f2a49");
+    ASSERT_TRUE(replica_fields[7] == "connected");
+  }
+
+  auto without_replica = 
cluster.GetReplicas("bb2e5b3c5282086df51eff6b3e35519aede96fa6");
+  ASSERT_TRUE(without_replica.IsOK());
+  ASSERT_EQ(without_replica.GetValue(), "");
+
+  auto replica_node = 
cluster.GetReplicas("7dbee3d628f04cc5d763b36e92b10533e627a1d0");
+  ASSERT_FALSE(replica_node.IsOK());
+  ASSERT_EQ(replica_node.Msg(), "The node isn't a master");
+
+  auto unknown_node = cluster.GetReplicas("1234567890");
+  ASSERT_FALSE(unknown_node.IsOK());
+  ASSERT_EQ(unknown_node.Msg(), "Invalid cluster node id");
+}
diff --git a/tests/gocase/integration/cluster/cluster_test.go 
b/tests/gocase/integration/cluster/cluster_test.go
index 3c4e27fe..a26dc74a 100644
--- a/tests/gocase/integration/cluster/cluster_test.go
+++ b/tests/gocase/integration/cluster/cluster_test.go
@@ -26,9 +26,10 @@ import (
        "testing"
        "time"
 
-       "github.com/apache/kvrocks/tests/gocase/util"
        "github.com/redis/go-redis/v9"
        "github.com/stretchr/testify/require"
+
+       "github.com/apache/kvrocks/tests/gocase/util"
 )
 
 func TestDisableCluster(t *testing.T) {
@@ -131,6 +132,61 @@ func TestClusterNodes(t *testing.T) {
        })
 }
 
+func TestClusterReplicas(t *testing.T) {
+       srv := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+       defer srv.Close()
+
+       ctx := context.Background()
+       rdb := srv.NewClient()
+       defer func() { require.NoError(t, rdb.Close()) }()
+
+       nodes := ""
+
+       master1ID := "bb2e5b3c5282086df51eff6b3e35519aede96fa6"
+       master1Node := fmt.Sprintf("%s %s %d master - 0-8191", master1ID, 
srv.Host(), srv.Port())
+       nodes += master1Node + "\n"
+
+       master2ID := "159dde1194ebf5bfc5a293dff839c3d1476f2a49"
+       master2Node := fmt.Sprintf("%s %s %d master - 8192-16383", master2ID, 
srv.Host(), srv.Port())
+       nodes += master2Node + "\n"
+
+       replica2ID := "7dbee3d628f04cc5d763b36e92b10533e627a1d0"
+       replica2Node := fmt.Sprintf("%s %s %d slave %s", replica2ID, 
srv.Host(), srv.Port(), master2ID)
+       nodes += replica2Node
+
+       require.NoError(t, rdb.Do(ctx, "clusterx", "SETNODES", nodes, 
"2").Err())
+       require.EqualValues(t, "2", rdb.Do(ctx, "clusterx", "version").Val())
+
+       t.Run("with replicas", func(t *testing.T) {
+               replicas, err := rdb.Do(ctx, "cluster", "replicas", 
"159dde1194ebf5bfc5a293dff839c3d1476f2a49").Text()
+               require.NoError(t, err)
+               fields := strings.Split(replicas, " ")
+               require.Len(t, fields, 8)
+               require.Equal(t, fmt.Sprintf("%s@%d", srv.HostPort(), 
srv.Port()+10000), fields[1])
+               require.Equal(t, "slave", fields[2])
+               require.Equal(t, master2ID, fields[3])
+               require.Equal(t, "connected\n", fields[7])
+       })
+
+       t.Run("without replicas", func(t *testing.T) {
+               replicas, err := rdb.Do(ctx, "cluster", "replicas", 
"bb2e5b3c5282086df51eff6b3e35519aede96fa6").Text()
+               require.NoError(t, err)
+               require.Empty(t, replicas)
+       })
+
+       t.Run("send command to replica", func(t *testing.T) {
+               err := rdb.Do(ctx, "cluster", "replicas", 
"7dbee3d628f04cc5d763b36e92b10533e627a1d0").Err()
+               require.Error(t, err)
+               require.Contains(t, err.Error(), "The node isn't a master")
+       })
+
+       t.Run("unknown node", func(t *testing.T) {
+               err := rdb.Do(ctx, "cluster", "replicas", "unknown").Err()
+               require.Error(t, err)
+               require.Contains(t, err.Error(), "Invalid cluster node id")
+       })
+}
+
 func TestClusterDumpAndLoadClusterNodesInfo(t *testing.T) {
        srv1 := util.StartServer(t, map[string]string{
                "bind":            "0.0.0.0",

Reply via email to