This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 3ce7c509 Add support of the ASKING command (#2273)
3ce7c509 is described below

commit 3ce7c509951107afd34405b1da53eb380865eaa1
Author: anotherJJz <[email protected]>
AuthorDate: Sun Apr 28 17:03:52 2024 +0800

    Add support of the ASKING command (#2273)
---
 src/cluster/cluster.cc                             |  5 +--
 src/commands/cmd_cluster.cc                        | 12 ++++++-
 src/server/redis_connection.cc                     |  6 ++++
 src/server/redis_connection.h                      |  1 +
 .../integration/slotimport/slotimport_test.go      | 41 ++++++++++++++++++++++
 5 files changed, 62 insertions(+), 3 deletions(-)

diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index 1244ea4a..74aec5d0 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -846,12 +846,13 @@ Status Cluster::CanExecByMySelf(const 
redis::CommandAttributes *attributes, cons
     return Status::OK();  // I'm serving this slot
   }
 
-  if (myself_ && myself_->importing_slot == slot && conn->IsImporting()) {
+  if (myself_ && myself_->importing_slot == slot &&
+      (conn->IsImporting() || 
conn->IsFlagEnabled(redis::Connection::kAsking))) {
     // While data migrating, the topology of the destination node has not been 
changed.
     // The destination node has to serve the requests from the migrating slot,
     // although the slot is not belong to itself. Therefore, we record the 
importing slot
     // and mark the importing connection to accept the importing data.
-    return Status::OK();  // I'm serving the importing connection
+    return Status::OK();  // I'm serving the importing connection or asking 
connection
   }
 
   if (myself_ && imported_slots_.count(slot)) {
diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc
index e371e78f..8ada4e8e 100644
--- a/src/commands/cmd_cluster.cc
+++ b/src/commands/cmd_cluster.cc
@@ -338,9 +338,19 @@ class CommandReadWrite : public Commander {
   }
 };
 
+class CommandAsking : public Commander {
+ public:
+  Status Execute(Server *srv, Connection *conn, std::string *output) override {
+    conn->EnableFlag(redis::Connection::kAsking);
+    *output = redis::SimpleString("OK");
+    return Status::OK();
+  }
+};
+
 REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandCluster>("cluster", -2, "cluster 
no-script", 0, 0, 0, GenerateClusterFlag),
                         MakeCmdAttr<CommandClusterX>("clusterx", -2, "cluster 
no-script", 0, 0, 0, GenerateClusterFlag),
                         MakeCmdAttr<CommandReadOnly>("readonly", 1, "cluster 
no-multi", 0, 0, 0),
-                        MakeCmdAttr<CommandReadWrite>("readwrite", 1, "cluster 
no-multi", 0, 0, 0), )
+                        MakeCmdAttr<CommandReadWrite>("readwrite", 1, "cluster 
no-multi", 0, 0, 0),
+                        MakeCmdAttr<CommandAsking>("asking", 1, "cluster", 0, 
0, 0), )
 
 }  // namespace redis
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index a470f225..4a84f0a7 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -220,6 +220,7 @@ std::string Connection::GetFlags() const {
   if (IsFlagEnabled(kSlave)) flags.append("S");
   if (IsFlagEnabled(kCloseAfterReply)) flags.append("c");
   if (IsFlagEnabled(kMonitor)) flags.append("M");
+  if (IsFlagEnabled(kAsking)) flags.append("A");
   if (!subscribe_channels_.empty() || !subscribe_patterns_.empty()) 
flags.append("P");
   if (flags.empty()) flags = "N";
   return flags;
@@ -504,6 +505,11 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> 
*to_process_cmds) {
       }
     }
 
+    // reset the ASKING flag after executing the next query
+    if (IsFlagEnabled(kAsking)) {
+      DisableFlag(kAsking);
+    }
+
     // We don't execute commands, but queue them, ant then execute in EXEC 
command
     if (is_multi_exec && !in_exec_ && !(cmd_flags & kCmdMulti)) {
       multi_cmds_.emplace_back(cmd_tokens);
diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h
index c206a158..f3015fcf 100644
--- a/src/server/redis_connection.h
+++ b/src/server/redis_connection.h
@@ -46,6 +46,7 @@ class Connection : public EvbufCallbackBase<Connection> {
     kCloseAsync = 1 << 7,
     kMultiExec = 1 << 8,
     kReadOnly = 1 << 9,
+    kAsking = 1 << 10,
   };
 
   explicit Connection(bufferevent *bev, Worker *owner);
diff --git a/tests/gocase/integration/slotimport/slotimport_test.go 
b/tests/gocase/integration/slotimport/slotimport_test.go
index 1d427b80..a3566cab 100644
--- a/tests/gocase/integration/slotimport/slotimport_test.go
+++ b/tests/gocase/integration/slotimport/slotimport_test.go
@@ -172,3 +172,44 @@ func TestImportedServer(t *testing.T) {
                require.Zero(t, rdbB.Exists(ctx, slotKey).Val())
        })
 }
+
+func TestServiceImportingSlot(t *testing.T) {
+       ctx := context.Background()
+
+       mockID0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       mockSrv0Host := "127.0.0.1"
+       mockSrv0Port := 6666
+
+       srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+       defer func() { srv1.Close() }()
+       rdb1 := srv1.NewClient()
+       defer func() { require.NoError(t, rdb1.Close()) }()
+       id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())
+
+       clusterNodes := fmt.Sprintf("%s %s %d master - 0-8191\n", mockID0, 
mockSrv0Host, mockSrv0Port)
+       clusterNodes += fmt.Sprintf("%s %s %d master - 8192-16383\n", id1, 
srv1.Host(), srv1.Port())
+       require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, 
"1").Err())
+
+       slotNum := 1
+       require.Equal(t, "OK", rdb1.Do(ctx, "cluster", "import", slotNum, 
0).Val())
+
+       // create a new client that is not importing
+       cli := srv1.NewClient()
+       slotKey := util.SlotTable[slotNum]
+
+       t.Run("IMPORT - query a key in importing slot without asking", func(t 
*testing.T) {
+               util.ErrorRegexp(t, cli.Type(ctx, slotKey).Err(), 
fmt.Sprintf("MOVED %d.*%d.*", slotNum, mockSrv0Port))
+       })
+
+       t.Run("IMPORT - query a key in importing slot after asking", func(t 
*testing.T) {
+               require.Equal(t, "OK", cli.Do(ctx, "asking").Val())
+               require.NoError(t, cli.Type(ctx, slotKey).Err())
+       })
+
+       t.Run("IMPORT - asking flag will be reset after executing", func(t 
*testing.T) {
+               require.Equal(t, "OK", cli.Do(ctx, "asking").Val())
+               require.NoError(t, cli.Type(ctx, slotKey).Err())
+               util.ErrorRegexp(t, cli.Type(ctx, slotKey).Err(), 
fmt.Sprintf("MOVED %d.*%d.*", slotNum, mockSrv0Port))
+       })
+}

Reply via email to