This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 20e01c45 Add support of READONLY and READWRITE (#2173)
20e01c45 is described below

commit 20e01c454bbebbefe312d5906bd147a70a058cd0
Author: Yuhui Liu <[email protected]>
AuthorDate: Sun Mar 17 21:54:37 2024 +0800

    Add support of READONLY and READWRITE (#2173)
    
    Co-authored-by: 纪华裕 <[email protected]>
---
 src/cluster/cluster.cc                           |  3 ++-
 src/commands/cmd_cluster.cc                      | 24 ++++++++++++++++++++++--
 src/server/redis_connection.h                    |  1 +
 tests/gocase/integration/cluster/cluster_test.go | 19 +++++++++++++++++++
 4 files changed, 44 insertions(+), 3 deletions(-)

diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index ff98da87..f6f3525b 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -802,7 +802,8 @@ Status Cluster::CanExecByMySelf(const 
redis::CommandAttributes *attributes, cons
   }
 
   if (myself_ && myself_->role == kClusterSlave && !(attributes->flags & 
redis::kCmdWrite) &&
-      nodes_.find(myself_->master_id) != nodes_.end() && 
nodes_[myself_->master_id] == slots_nodes_[slot]) {
+      nodes_.find(myself_->master_id) != nodes_.end() && 
nodes_[myself_->master_id] == slots_nodes_[slot] &&
+      conn->IsFlagEnabled(redis::Connection::KReadOnly)) {
     return Status::OK();  // My master is serving this slot
   }
 
diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc
index 96ecfbde..99a79f14 100644
--- a/src/commands/cmd_cluster.cc
+++ b/src/commands/cmd_cluster.cc
@@ -23,6 +23,7 @@
 #include "cluster/sync_migrate_context.h"
 #include "commander.h"
 #include "error_constants.h"
+#include "status.h"
 
 namespace redis {
 
@@ -292,8 +293,27 @@ static uint64_t GenerateClusterFlag(const 
std::vector<std::string> &args) {
   return 0;
 }
 
+class CommandReadOnly : public Commander {
+ public:
+  Status Execute(Server *srv, Connection *conn, std::string *output) override {
+    *output = redis::SimpleString("OK");
+    conn->EnableFlag(redis::Connection::KReadOnly);
+    return Status::OK();
+  }
+};
+
+class CommandReadWrite : public Commander {
+ public:
+  Status Execute(Server *srv, Connection *conn, std::string *output) override {
+    *output = redis::SimpleString("OK");
+    conn->DisableFlag(redis::Connection::KReadOnly);
+    return Status::OK();
+  }
+};
+
 REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandCluster>("cluster", -2, "cluster 
no-script", 0, 0, 0, GenerateClusterFlag),
-                        MakeCmdAttr<CommandClusterX>("clusterx", -2, "cluster 
no-script", 0, 0, 0,
-                                                     GenerateClusterFlag), )
+                        MakeCmdAttr<CommandClusterX>("clusterx", -2, "cluster 
no-script", 0, 0, 0, GenerateClusterFlag),
+                        MakeCmdAttr<CommandReadOnly>("readonly", 1, "cluster 
no-multi", 0, 0, 0),
+                        MakeCmdAttr<CommandReadWrite>("readwrite", 1, "cluster 
no-multi", 0, 0, 0), )
 
 }  // namespace redis
diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h
index 506f30a9..79b9dd18 100644
--- a/src/server/redis_connection.h
+++ b/src/server/redis_connection.h
@@ -45,6 +45,7 @@ class Connection : public EvbufCallbackBase<Connection> {
     kCloseAfterReply = 1 << 6,
     kCloseAsync = 1 << 7,
     kMultiExec = 1 << 8,
+    KReadOnly = 1 << 9,
   };
 
   explicit Connection(bufferevent *bev, Worker *owner);
diff --git a/tests/gocase/integration/cluster/cluster_test.go 
b/tests/gocase/integration/cluster/cluster_test.go
index 8bc42fda..695eb33c 100644
--- a/tests/gocase/integration/cluster/cluster_test.go
+++ b/tests/gocase/integration/cluster/cluster_test.go
@@ -334,6 +334,11 @@ func TestClusterMultiple(t *testing.T) {
                require.ErrorContains(t, rdb[3].Set(ctx, util.SlotTable[16383], 
16383, 0).Err(), "MOVED")
                // request a read-only command to node3 that serve slot 16383, 
that's ok
                util.WaitForOffsetSync(t, rdb[2], rdb[3])
+               //the default option is READWRITE, which will redirect both 
read and write to master
+               require.ErrorContains(t, rdb[3].Get(ctx, 
util.SlotTable[16383]).Err(), "MOVED")
+
+               require.NoError(t, rdb[3].Do(ctx, "READONLY").Err())
+
                require.Equal(t, "16383", rdb[3].Get(ctx, 
util.SlotTable[16383]).Val())
        })
 
@@ -369,4 +374,18 @@ func TestClusterMultiple(t *testing.T) {
                require.ErrorContains(t, rdb[1].Do(ctx, "EXEC").Err(), 
"EXECABORT")
                require.Equal(t, "no-multi", rdb[1].Get(ctx, 
util.SlotTable[0]).Val())
        })
+
+       t.Run("requests on cluster are ok when enable readonly", func(t 
*testing.T) {
+
+               require.NoError(t, rdb[3].Do(ctx, "READONLY").Err())
+               require.NoError(t, rdb[2].Set(ctx, util.SlotTable[8192], 8192, 
0).Err())
+               util.WaitForOffsetSync(t, rdb[2], rdb[3])
+               // request node3 that serves slot 8192, that's ok
+               require.Equal(t, "8192", rdb[3].Get(ctx, 
util.SlotTable[8192]).Val())
+
+               require.NoError(t, rdb[3].Do(ctx, "READWRITE").Err())
+
+               // when enable READWRITE, request node3 that serves slot 8192, 
that's not ok
+               util.ErrorRegexp(t, rdb[3].Get(ctx, 
util.SlotTable[8192]).Err(), fmt.Sprintf("MOVED 8192.*%d.*", srv[2].Port()))
+       })
 }

Reply via email to