This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 20e01c45 Add support of READONLY and READWRITE (#2173)
20e01c45 is described below
commit 20e01c454bbebbefe312d5906bd147a70a058cd0
Author: Yuhui Liu <[email protected]>
AuthorDate: Sun Mar 17 21:54:37 2024 +0800
Add support of READONLY and READWRITE (#2173)
Co-authored-by: 纪华裕 <[email protected]>
---
src/cluster/cluster.cc | 3 ++-
src/commands/cmd_cluster.cc | 24 ++++++++++++++++++++++--
src/server/redis_connection.h | 1 +
tests/gocase/integration/cluster/cluster_test.go | 19 +++++++++++++++++++
4 files changed, 44 insertions(+), 3 deletions(-)
diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index ff98da87..f6f3525b 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -802,7 +802,8 @@ Status Cluster::CanExecByMySelf(const
redis::CommandAttributes *attributes, cons
}
if (myself_ && myself_->role == kClusterSlave && !(attributes->flags &
redis::kCmdWrite) &&
- nodes_.find(myself_->master_id) != nodes_.end() &&
nodes_[myself_->master_id] == slots_nodes_[slot]) {
+ nodes_.find(myself_->master_id) != nodes_.end() &&
nodes_[myself_->master_id] == slots_nodes_[slot] &&
+ conn->IsFlagEnabled(redis::Connection::KReadOnly)) {
return Status::OK(); // My master is serving this slot
}
diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc
index 96ecfbde..99a79f14 100644
--- a/src/commands/cmd_cluster.cc
+++ b/src/commands/cmd_cluster.cc
@@ -23,6 +23,7 @@
#include "cluster/sync_migrate_context.h"
#include "commander.h"
#include "error_constants.h"
+#include "status.h"
namespace redis {
@@ -292,8 +293,27 @@ static uint64_t GenerateClusterFlag(const
std::vector<std::string> &args) {
return 0;
}
+class CommandReadOnly : public Commander {
+ public:
+ Status Execute(Server *srv, Connection *conn, std::string *output) override {
+ *output = redis::SimpleString("OK");
+ conn->EnableFlag(redis::Connection::KReadOnly);
+ return Status::OK();
+ }
+};
+
+class CommandReadWrite : public Commander {
+ public:
+ Status Execute(Server *srv, Connection *conn, std::string *output) override {
+ *output = redis::SimpleString("OK");
+ conn->DisableFlag(redis::Connection::KReadOnly);
+ return Status::OK();
+ }
+};
+
REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandCluster>("cluster", -2, "cluster
no-script", 0, 0, 0, GenerateClusterFlag),
- MakeCmdAttr<CommandClusterX>("clusterx", -2, "cluster
no-script", 0, 0, 0,
- GenerateClusterFlag), )
+ MakeCmdAttr<CommandClusterX>("clusterx", -2, "cluster
no-script", 0, 0, 0, GenerateClusterFlag),
+ MakeCmdAttr<CommandReadOnly>("readonly", 1, "cluster
no-multi", 0, 0, 0),
+ MakeCmdAttr<CommandReadWrite>("readwrite", 1, "cluster
no-multi", 0, 0, 0), )
} // namespace redis
diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h
index 506f30a9..79b9dd18 100644
--- a/src/server/redis_connection.h
+++ b/src/server/redis_connection.h
@@ -45,6 +45,7 @@ class Connection : public EvbufCallbackBase<Connection> {
kCloseAfterReply = 1 << 6,
kCloseAsync = 1 << 7,
kMultiExec = 1 << 8,
+ KReadOnly = 1 << 9,
};
explicit Connection(bufferevent *bev, Worker *owner);
diff --git a/tests/gocase/integration/cluster/cluster_test.go
b/tests/gocase/integration/cluster/cluster_test.go
index 8bc42fda..695eb33c 100644
--- a/tests/gocase/integration/cluster/cluster_test.go
+++ b/tests/gocase/integration/cluster/cluster_test.go
@@ -334,6 +334,11 @@ func TestClusterMultiple(t *testing.T) {
require.ErrorContains(t, rdb[3].Set(ctx, util.SlotTable[16383],
16383, 0).Err(), "MOVED")
// request a read-only command to node3 that serve slot 16383,
that's ok
util.WaitForOffsetSync(t, rdb[2], rdb[3])
+ //the default option is READWRITE, which will redirect both
read and write to master
+ require.ErrorContains(t, rdb[3].Get(ctx,
util.SlotTable[16383]).Err(), "MOVED")
+
+ require.NoError(t, rdb[3].Do(ctx, "READONLY").Err())
+
require.Equal(t, "16383", rdb[3].Get(ctx,
util.SlotTable[16383]).Val())
})
@@ -369,4 +374,18 @@ func TestClusterMultiple(t *testing.T) {
require.ErrorContains(t, rdb[1].Do(ctx, "EXEC").Err(),
"EXECABORT")
require.Equal(t, "no-multi", rdb[1].Get(ctx,
util.SlotTable[0]).Val())
})
+
+ t.Run("requests on cluster are ok when enable readonly", func(t
*testing.T) {
+
+ require.NoError(t, rdb[3].Do(ctx, "READONLY").Err())
+ require.NoError(t, rdb[2].Set(ctx, util.SlotTable[8192], 8192,
0).Err())
+ util.WaitForOffsetSync(t, rdb[2], rdb[3])
+ // request node3 that serves slot 8192, that's ok
+ require.Equal(t, "8192", rdb[3].Get(ctx,
util.SlotTable[8192]).Val())
+
+ require.NoError(t, rdb[3].Do(ctx, "READWRITE").Err())
+
+ // when enable READWRITE, request node3 that serves slot 8192,
that's not ok
+ util.ErrorRegexp(t, rdb[3].Get(ctx,
util.SlotTable[8192]).Err(), fmt.Sprintf("MOVED 8192.*%d.*", srv[2].Port()))
+ })
}