This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 3ce7c509 Add support of the ASKING command (#2273)
3ce7c509 is described below
commit 3ce7c509951107afd34405b1da53eb380865eaa1
Author: anotherJJz <[email protected]>
AuthorDate: Sun Apr 28 17:03:52 2024 +0800
Add support of the ASKING command (#2273)
---
src/cluster/cluster.cc | 5 +--
src/commands/cmd_cluster.cc | 12 ++++++-
src/server/redis_connection.cc | 6 ++++
src/server/redis_connection.h | 1 +
.../integration/slotimport/slotimport_test.go | 41 ++++++++++++++++++++++
5 files changed, 62 insertions(+), 3 deletions(-)
diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index 1244ea4a..74aec5d0 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -846,12 +846,13 @@ Status Cluster::CanExecByMySelf(const
redis::CommandAttributes *attributes, cons
return Status::OK(); // I'm serving this slot
}
- if (myself_ && myself_->importing_slot == slot && conn->IsImporting()) {
+ if (myself_ && myself_->importing_slot == slot &&
+ (conn->IsImporting() ||
conn->IsFlagEnabled(redis::Connection::kAsking))) {
// While data migrating, the topology of the destination node has not been
changed.
// The destination node has to serve the requests from the migrating slot,
// although the slot is not belong to itself. Therefore, we record the
importing slot
// and mark the importing connection to accept the importing data.
- return Status::OK(); // I'm serving the importing connection
+ return Status::OK(); // I'm serving the importing connection or asking
connection
}
if (myself_ && imported_slots_.count(slot)) {
diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc
index e371e78f..8ada4e8e 100644
--- a/src/commands/cmd_cluster.cc
+++ b/src/commands/cmd_cluster.cc
@@ -338,9 +338,19 @@ class CommandReadWrite : public Commander {
}
};
+class CommandAsking : public Commander {
+ public:
+ Status Execute(Server *srv, Connection *conn, std::string *output) override {
+ conn->EnableFlag(redis::Connection::kAsking);
+ *output = redis::SimpleString("OK");
+ return Status::OK();
+ }
+};
+
REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandCluster>("cluster", -2, "cluster
no-script", 0, 0, 0, GenerateClusterFlag),
MakeCmdAttr<CommandClusterX>("clusterx", -2, "cluster
no-script", 0, 0, 0, GenerateClusterFlag),
MakeCmdAttr<CommandReadOnly>("readonly", 1, "cluster
no-multi", 0, 0, 0),
- MakeCmdAttr<CommandReadWrite>("readwrite", 1, "cluster
no-multi", 0, 0, 0), )
+ MakeCmdAttr<CommandReadWrite>("readwrite", 1, "cluster
no-multi", 0, 0, 0),
+ MakeCmdAttr<CommandAsking>("asking", 1, "cluster", 0,
0, 0), )
} // namespace redis
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index a470f225..4a84f0a7 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -220,6 +220,7 @@ std::string Connection::GetFlags() const {
if (IsFlagEnabled(kSlave)) flags.append("S");
if (IsFlagEnabled(kCloseAfterReply)) flags.append("c");
if (IsFlagEnabled(kMonitor)) flags.append("M");
+ if (IsFlagEnabled(kAsking)) flags.append("A");
if (!subscribe_channels_.empty() || !subscribe_patterns_.empty())
flags.append("P");
if (flags.empty()) flags = "N";
return flags;
@@ -504,6 +505,11 @@ void Connection::ExecuteCommands(std::deque<CommandTokens>
*to_process_cmds) {
}
}
+ // reset the ASKING flag after executing the next query
+ if (IsFlagEnabled(kAsking)) {
+ DisableFlag(kAsking);
+ }
+
// We don't execute commands, but queue them, ant then execute in EXEC
command
if (is_multi_exec && !in_exec_ && !(cmd_flags & kCmdMulti)) {
multi_cmds_.emplace_back(cmd_tokens);
diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h
index c206a158..f3015fcf 100644
--- a/src/server/redis_connection.h
+++ b/src/server/redis_connection.h
@@ -46,6 +46,7 @@ class Connection : public EvbufCallbackBase<Connection> {
kCloseAsync = 1 << 7,
kMultiExec = 1 << 8,
kReadOnly = 1 << 9,
+ kAsking = 1 << 10,
};
explicit Connection(bufferevent *bev, Worker *owner);
diff --git a/tests/gocase/integration/slotimport/slotimport_test.go
b/tests/gocase/integration/slotimport/slotimport_test.go
index 1d427b80..a3566cab 100644
--- a/tests/gocase/integration/slotimport/slotimport_test.go
+++ b/tests/gocase/integration/slotimport/slotimport_test.go
@@ -172,3 +172,44 @@ func TestImportedServer(t *testing.T) {
require.Zero(t, rdbB.Exists(ctx, slotKey).Val())
})
}
+
+func TestServiceImportingSlot(t *testing.T) {
+ ctx := context.Background()
+
+ mockID0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+ mockSrv0Host := "127.0.0.1"
+ mockSrv0Port := 6666
+
+ srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ defer func() { srv1.Close() }()
+ rdb1 := srv1.NewClient()
+ defer func() { require.NoError(t, rdb1.Close()) }()
+ id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())
+
+ clusterNodes := fmt.Sprintf("%s %s %d master - 0-8191\n", mockID0,
mockSrv0Host, mockSrv0Port)
+ clusterNodes += fmt.Sprintf("%s %s %d master - 8192-16383\n", id1,
srv1.Host(), srv1.Port())
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes,
"1").Err())
+
+ slotNum := 1
+ require.Equal(t, "OK", rdb1.Do(ctx, "cluster", "import", slotNum,
0).Val())
+
+ // create a new client that is not importing
+ cli := srv1.NewClient()
+ slotKey := util.SlotTable[slotNum]
+
+ t.Run("IMPORT - query a key in importing slot without asking", func(t
*testing.T) {
+ util.ErrorRegexp(t, cli.Type(ctx, slotKey).Err(),
fmt.Sprintf("MOVED %d.*%d.*", slotNum, mockSrv0Port))
+ })
+
+ t.Run("IMPORT - query a key in importing slot after asking", func(t
*testing.T) {
+ require.Equal(t, "OK", cli.Do(ctx, "asking").Val())
+ require.NoError(t, cli.Type(ctx, slotKey).Err())
+ })
+
+ t.Run("IMPORT - asking flag will be reset after executing", func(t
*testing.T) {
+ require.Equal(t, "OK", cli.Do(ctx, "asking").Val())
+ require.NoError(t, cli.Type(ctx, slotKey).Err())
+ util.ErrorRegexp(t, cli.Type(ctx, slotKey).Err(),
fmt.Sprintf("MOVED %d.*%d.*", slotNum, mockSrv0Port))
+ })
+}