This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 6aabd805 Add support of the CLUSTER REPLICAS command (#2244)
6aabd805 is described below
commit 6aabd8050a7d2cb26dee78906d448514f79fff26
Author: Hyeonho Kim <[email protected]>
AuthorDate: Sat Apr 13 18:39:10 2024 +0900
Add support of the CLUSTER REPLICAS command (#2244)
---
src/cluster/cluster.cc | 42 +++++++++++++++++
src/cluster/cluster.h | 1 +
src/commands/cmd_cluster.cc | 12 ++++-
tests/cppunit/cluster_test.cc | 46 +++++++++++++++++++
tests/gocase/integration/cluster/cluster_test.go | 58 +++++++++++++++++++++++-
5 files changed, 157 insertions(+), 2 deletions(-)
diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index 8e16270c..72a7f15a 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -469,6 +469,48 @@ Status Cluster::GetClusterNodes(std::string *nodes_str) {
return Status::OK();
}
+StatusOr<std::string> Cluster::GetReplicas(const std::string &node_id) {
+ if (version_ < 0) {
+ return {Status::ClusterDown, errClusterNoInitialized};
+ }
+
+ auto item = nodes_.find(node_id);
+ if (item == nodes_.end()) {
+ return {Status::InvalidArgument, errInvalidNodeID};
+ }
+
+ auto node = item->second;
+ if (node->role != kClusterMaster) {
+ return {Status::InvalidArgument, errNoMasterNode};
+ }
+
+ auto now = util::GetTimeStampMS();
+ std::string replicas_desc;
+ for (const auto &replica_id : node->replicas) {
+ auto n = nodes_.find(replica_id);
+ if (n == nodes_.end()) {
+ continue;
+ }
+
+ auto replica = n->second;
+
+ std::string node_str;
+ // ID, host, port
+ node_str.append(
+ fmt::format("{} {}:{}@{} ", replica_id, replica->host, replica->port,
replica->port + kClusterPortIncr));
+
+ // Flags
+ node_str.append(fmt::format("slave {} ", node_id));
+
+ // Ping sent, pong received, config epoch, link status
+ node_str.append(fmt::format("{} {} {} connected", now - 1, now, version_));
+
+ replicas_desc.append(node_str + "\n");
+ }
+
+ return replicas_desc;
+}
+
std::string Cluster::getNodeIDBySlot(int slot) const {
if (slot < 0 || slot >= kClusterSlots || !slots_nodes_[slot]) return "";
return slots_nodes_[slot]->id;
diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h
index 79e0d38e..c98ea668 100644
--- a/src/cluster/cluster.h
+++ b/src/cluster/cluster.h
@@ -71,6 +71,7 @@ class Cluster {
explicit Cluster(Server *srv, std::vector<std::string> binds, int port);
Status SetClusterNodes(const std::string &nodes_str, int64_t version, bool
force);
Status GetClusterNodes(std::string *nodes_str);
+ StatusOr<std::string> GetReplicas(const std::string &node_id);
Status SetNodeId(const std::string &node_id);
Status SetSlotRanges(const std::vector<SlotRange> &slot_ranges, const
std::string &node_id, int64_t version);
Status SetSlotMigrated(int slot, const std::string &ip_port);
diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc
index 5831fa50..04382ac1 100644
--- a/src/commands/cmd_cluster.cc
+++ b/src/commands/cmd_cluster.cc
@@ -49,7 +49,9 @@ class CommandCluster : public Commander {
return Status::OK();
}
- return {Status::RedisParseErr, "CLUSTER command, CLUSTER
INFO|NODES|SLOTS|KEYSLOT|RESET"};
+ if (subcommand_ == "replicas" && args_.size() == 3) return Status::OK();
+
+ return {Status::RedisParseErr, "CLUSTER command, CLUSTER
INFO|NODES|SLOTS|KEYSLOT|RESET|REPLICAS"};
}
Status Execute(Server *srv, Connection *conn, std::string *output) override {
@@ -113,6 +115,14 @@ class CommandCluster : public Commander {
} else {
return {Status::RedisExecErr, s.Msg()};
}
+ } else if (subcommand_ == "replicas") {
+ auto node_id = args_[2];
+ StatusOr<std::string> s = srv->cluster->GetReplicas(node_id);
+ if (s.IsOK()) {
+ *output = conn->VerbatimString("txt", s.GetValue());
+ } else {
+ return {Status::RedisExecErr, s.Msg()};
+ }
} else {
return {Status::RedisExecErr, "Invalid cluster command options"};
}
diff --git a/tests/cppunit/cluster_test.cc b/tests/cppunit/cluster_test.cc
index 810a1ca1..e70c3136 100644
--- a/tests/cppunit/cluster_test.cc
+++ b/tests/cppunit/cluster_test.cc
@@ -340,3 +340,49 @@ TEST_F(ClusterTest, ClusterParseSlotRanges) {
slots.clear();
}
}
+
+TEST_F(ClusterTest, GetReplicas) {
+ auto config = storage_->GetConfig();
+ // don't start workers
+ config->workers = 0;
+ Server server(storage_.get(), config);
+ // we don't need the server resource, so just stop it once it's started
+ server.Stop();
+ server.Join();
+
+ const std::string nodes =
+ "7dbee3d628f04cc5d763b36e92b10533e627a1d0 127.0.0.1 6480 slave
159dde1194ebf5bfc5a293dff839c3d1476f2a49\n"
+ "159dde1194ebf5bfc5a293dff839c3d1476f2a49 127.0.0.1 6479 master -
8192-16383\n"
+ "bb2e5b3c5282086df51eff6b3e35519aede96fa6 127.0.0.1 6379 master -
0-8191";
+
+ Cluster cluster(&server, {"127.0.0.1"}, 6379);
+ Status s = cluster.SetClusterNodes(nodes, 2, false);
+ ASSERT_TRUE(s.IsOK());
+
+ auto with_replica =
cluster.GetReplicas("159dde1194ebf5bfc5a293dff839c3d1476f2a49");
+ ASSERT_TRUE(s.IsOK());
+
+ std::vector<std::string> replicas = util::Split(with_replica.GetValue(),
"\n");
+ for (const auto &replica : replicas) {
+ std::vector<std::string> replica_fields = util::Split(replica, " ");
+
+ ASSERT_TRUE(replica_fields.size() == 8);
+ ASSERT_TRUE(replica_fields[0] ==
"7dbee3d628f04cc5d763b36e92b10533e627a1d0");
+ ASSERT_TRUE(replica_fields[1] == "127.0.0.1:6480@16480");
+ ASSERT_TRUE(replica_fields[2] == "slave");
+ ASSERT_TRUE(replica_fields[3] ==
"159dde1194ebf5bfc5a293dff839c3d1476f2a49");
+ ASSERT_TRUE(replica_fields[7] == "connected");
+ }
+
+ auto without_replica =
cluster.GetReplicas("bb2e5b3c5282086df51eff6b3e35519aede96fa6");
+ ASSERT_TRUE(without_replica.IsOK());
+ ASSERT_EQ(without_replica.GetValue(), "");
+
+ auto replica_node =
cluster.GetReplicas("7dbee3d628f04cc5d763b36e92b10533e627a1d0");
+ ASSERT_FALSE(replica_node.IsOK());
+ ASSERT_EQ(replica_node.Msg(), "The node isn't a master");
+
+ auto unknown_node = cluster.GetReplicas("1234567890");
+ ASSERT_FALSE(unknown_node.IsOK());
+ ASSERT_EQ(unknown_node.Msg(), "Invalid cluster node id");
+}
diff --git a/tests/gocase/integration/cluster/cluster_test.go
b/tests/gocase/integration/cluster/cluster_test.go
index 3c4e27fe..a26dc74a 100644
--- a/tests/gocase/integration/cluster/cluster_test.go
+++ b/tests/gocase/integration/cluster/cluster_test.go
@@ -26,9 +26,10 @@ import (
"testing"
"time"
- "github.com/apache/kvrocks/tests/gocase/util"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/require"
+
+ "github.com/apache/kvrocks/tests/gocase/util"
)
func TestDisableCluster(t *testing.T) {
@@ -131,6 +132,61 @@ func TestClusterNodes(t *testing.T) {
})
}
+func TestClusterReplicas(t *testing.T) {
+ srv := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ defer srv.Close()
+
+ ctx := context.Background()
+ rdb := srv.NewClient()
+ defer func() { require.NoError(t, rdb.Close()) }()
+
+ nodes := ""
+
+ master1ID := "bb2e5b3c5282086df51eff6b3e35519aede96fa6"
+ master1Node := fmt.Sprintf("%s %s %d master - 0-8191", master1ID,
srv.Host(), srv.Port())
+ nodes += master1Node + "\n"
+
+ master2ID := "159dde1194ebf5bfc5a293dff839c3d1476f2a49"
+ master2Node := fmt.Sprintf("%s %s %d master - 8192-16383", master2ID,
srv.Host(), srv.Port())
+ nodes += master2Node + "\n"
+
+ replica2ID := "7dbee3d628f04cc5d763b36e92b10533e627a1d0"
+ replica2Node := fmt.Sprintf("%s %s %d slave %s", replica2ID,
srv.Host(), srv.Port(), master2ID)
+ nodes += replica2Node
+
+ require.NoError(t, rdb.Do(ctx, "clusterx", "SETNODES", nodes,
"2").Err())
+ require.EqualValues(t, "2", rdb.Do(ctx, "clusterx", "version").Val())
+
+ t.Run("with replicas", func(t *testing.T) {
+ replicas, err := rdb.Do(ctx, "cluster", "replicas",
"159dde1194ebf5bfc5a293dff839c3d1476f2a49").Text()
+ require.NoError(t, err)
+ fields := strings.Split(replicas, " ")
+ require.Len(t, fields, 8)
+ require.Equal(t, fmt.Sprintf("%s@%d", srv.HostPort(),
srv.Port()+10000), fields[1])
+ require.Equal(t, "slave", fields[2])
+ require.Equal(t, master2ID, fields[3])
+ require.Equal(t, "connected\n", fields[7])
+ })
+
+ t.Run("without replicas", func(t *testing.T) {
+ replicas, err := rdb.Do(ctx, "cluster", "replicas",
"bb2e5b3c5282086df51eff6b3e35519aede96fa6").Text()
+ require.NoError(t, err)
+ require.Empty(t, replicas)
+ })
+
+ t.Run("send command to replica", func(t *testing.T) {
+ err := rdb.Do(ctx, "cluster", "replicas",
"7dbee3d628f04cc5d763b36e92b10533e627a1d0").Err()
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "The node isn't a master")
+ })
+
+ t.Run("unknown node", func(t *testing.T) {
+ err := rdb.Do(ctx, "cluster", "replicas", "unknown").Err()
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "Invalid cluster node id")
+ })
+}
+
func TestClusterDumpAndLoadClusterNodesInfo(t *testing.T) {
srv1 := util.StartServer(t, map[string]string{
"bind": "0.0.0.0",