This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 58d177fc Unify the workflow of command executing in connection and 
scripting (#2095)
58d177fc is described below

commit 58d177fc1d78198a3bfa21f53db8c6a5031dc380
Author: Twice <[email protected]>
AuthorDate: Sat Feb 10 18:46:54 2024 +0900

    Unify the workflow of command executing in connection and scripting (#2095)
---
 src/commands/commander.h       |  4 ++++
 src/server/redis_connection.cc | 41 +++++++++++++++++++++++------------------
 src/server/redis_connection.h  |  2 ++
 src/server/server.cc           | 12 ++++++------
 src/server/server.h            |  2 +-
 src/storage/scripting.cc       | 38 +++++++++++++-------------------------
 6 files changed, 49 insertions(+), 50 deletions(-)

diff --git a/src/commands/commander.h b/src/commands/commander.h
index 85cdde76..d759bd20 100644
--- a/src/commands/commander.h
+++ b/src/commands/commander.h
@@ -149,6 +149,10 @@ struct CommandAttributes {
     if (flag_gen) res |= flag_gen(args);
     return res;
   }
+
+  bool CheckArity(int cmd_size) const {
+    return !((arity > 0 && cmd_size != arity) || (arity < 0 && cmd_size < 
-arity));
+  }
 };
 
 using CommandMap = std::map<std::string, const CommandAttributes *>;
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index aa830ac8..80904507 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -391,6 +391,23 @@ void Connection::RecordProfilingSampleIfNeed(const 
std::string &cmd, uint64_t du
   srv_->GetPerfLog()->PushEntry(std::move(entry));
 }
 
+Status Connection::ExecuteCommand(const std::string &cmd_name, const 
std::vector<std::string> &cmd_tokens,
+                                  Commander *current_cmd, std::string *reply) {
+  srv_->stats.IncrCalls(cmd_name);
+
+  auto start = std::chrono::high_resolution_clock::now();
+  bool is_profiling = IsProfilingEnabled(cmd_name);
+  auto s = current_cmd->Execute(srv_, this, reply);
+  auto end = std::chrono::high_resolution_clock::now();
+  uint64_t duration = 
std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
+  if (is_profiling) RecordProfilingSampleIfNeed(cmd_name, duration);
+
+  srv_->SlowlogPushEntryIfNeeded(&cmd_tokens, duration, this);
+  srv_->stats.IncrLatency(static_cast<uint64_t>(duration), cmd_name);
+  srv_->FeedMonitorConns(this, cmd_tokens);
+  return s;
+}
+
 void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
   Config *config = srv_->GetConfig();
   std::string reply, password = config->requirepass;
@@ -403,13 +420,13 @@ void 
Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
     bool is_multi_exec = IsFlagEnabled(Connection::kMultiExec);
     if (IsFlagEnabled(redis::Connection::kCloseAfterReply) && !is_multi_exec) 
break;
 
-    std::unique_ptr<Commander> current_cmd;
-    auto s = srv_->LookupAndCreateCommand(cmd_tokens.front(), &current_cmd);
-    if (!s.IsOK()) {
+    auto cmd_s = srv_->LookupAndCreateCommand(cmd_tokens.front());
+    if (!cmd_s.IsOK()) {
       if (is_multi_exec) multi_error_ = true;
       Reply(redis::Error("ERR unknown command " + cmd_tokens.front()));
       continue;
     }
+    auto current_cmd = std::move(*cmd_s);
 
     if (GetNamespace().empty()) {
       if (!password.empty() && util::ToLower(cmd_tokens.front()) != "auth" &&
@@ -457,16 +474,15 @@ void 
Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
       continue;
     }
 
-    int arity = attributes->arity;
     int tokens = static_cast<int>(cmd_tokens.size());
-    if ((arity > 0 && tokens != arity) || (arity < 0 && tokens < -arity)) {
+    if (!attributes->CheckArity(tokens)) {
       if (is_multi_exec) multi_error_ = true;
       Reply(redis::Error("ERR wrong number of arguments"));
       continue;
     }
 
     current_cmd->SetArgs(cmd_tokens);
-    s = current_cmd->Parse();
+    auto s = current_cmd->Parse();
     if (!s.IsOK()) {
       if (is_multi_exec) multi_error_ = true;
       Reply(redis::Error("ERR " + s.Msg()));
@@ -515,18 +531,7 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> 
*to_process_cmds) {
     }
 
     SetLastCmd(cmd_name);
-    srv_->stats.IncrCalls(cmd_name);
-
-    auto start = std::chrono::high_resolution_clock::now();
-    bool is_profiling = IsProfilingEnabled(cmd_name);
-    s = current_cmd->Execute(srv_, this, &reply);
-    auto end = std::chrono::high_resolution_clock::now();
-    uint64_t duration = 
std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
-    if (is_profiling) RecordProfilingSampleIfNeed(cmd_name, duration);
-
-    srv_->SlowlogPushEntryIfNeeded(&cmd_tokens, duration, this);
-    srv_->stats.IncrLatency(static_cast<uint64_t>(duration), cmd_name);
-    srv_->FeedMonitorConns(this, cmd_tokens);
+    s = ExecuteCommand(cmd_name, cmd_tokens, current_cmd.get(), &reply);
 
     // Break the execution loop when occurring the blocking command like BLPOP 
or BRPOP,
     // it will suspend the connection and wait for the wakeup signal.
diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h
index c90a349a..2211e2e1 100644
--- a/src/server/redis_connection.h
+++ b/src/server/redis_connection.h
@@ -148,6 +148,8 @@ class Connection : public EvbufCallbackBase<Connection> {
   evbuffer *Output() { return bufferevent_get_output(bev_); }
   bufferevent *GetBufferEvent() { return bev_; }
   void ExecuteCommands(std::deque<CommandTokens> *to_process_cmds);
+  Status ExecuteCommand(const std::string &cmd_name, const 
std::vector<std::string> &cmd_tokens, Commander *current_cmd,
+                        std::string *reply);
   bool IsProfilingEnabled(const std::string &cmd);
   void RecordProfilingSampleIfNeed(const std::string &cmd, uint64_t duration);
   void SetImporting() { importing_ = true; }
diff --git a/src/server/server.cc b/src/server/server.cc
index 4d67fc64..cd6b4acd 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -679,7 +679,7 @@ void Server::OnEntryAddedToStream(const std::string &ns, 
const std::string &key,
 void Server::updateCachedTime() { unix_time.store(util::GetTimeStamp()); }
 
 int Server::IncrClientNum() {
-  total_clients_.fetch_add(1, std::memory_order::memory_order_relaxed);
+  total_clients_.fetch_add(1, std::memory_order_relaxed);
   return connected_clients_.fetch_add(1, std::memory_order_relaxed);
 }
 
@@ -1597,7 +1597,7 @@ ReplState Server::GetReplicationState() {
   return kReplConnecting;
 }
 
-Status Server::LookupAndCreateCommand(const std::string &cmd_name, 
std::unique_ptr<redis::Commander> *cmd) {
+StatusOr<std::unique_ptr<redis::Commander>> 
Server::LookupAndCreateCommand(const std::string &cmd_name) {
   if (cmd_name.empty()) return {Status::RedisUnknownCmd};
 
   auto commands = redis::CommandTable::Get();
@@ -1606,11 +1606,11 @@ Status Server::LookupAndCreateCommand(const std::string 
&cmd_name, std::unique_p
     return {Status::RedisUnknownCmd};
   }
 
-  auto redis_cmd = cmd_iter->second;
-  *cmd = redis_cmd->factory();
-  (*cmd)->SetAttributes(redis_cmd);
+  auto cmd_attr = cmd_iter->second;
+  auto cmd = cmd_attr->factory();
+  cmd->SetAttributes(cmd_attr);
 
-  return Status::OK();
+  return cmd;
 }
 
 Status Server::ScriptExists(const std::string &sha) {
diff --git a/src/server/server.h b/src/server/server.h
index a86eedf1..4f8fe314 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -177,7 +177,7 @@ class Server {
   bool IsStopped() const { return stop_; }
   bool IsLoading() const { return is_loading_; }
   Config *GetConfig() { return config_; }
-  static Status LookupAndCreateCommand(const std::string &cmd_name, 
std::unique_ptr<redis::Commander> *cmd);
+  static StatusOr<std::unique_ptr<redis::Commander>> 
LookupAndCreateCommand(const std::string &cmd_name);
   void AdjustOpenFilesLimit();
   void AdjustWorkerThreads();
 
diff --git a/src/storage/scripting.cc b/src/storage/scripting.cc
index 8105d321..e4c6d1fb 100644
--- a/src/storage/scripting.cc
+++ b/src/storage/scripting.cc
@@ -707,36 +707,32 @@ int RedisGenericCommand(lua_State *lua, int raise_error) {
     }
   }
 
-  auto commands = redis::CommandTable::Get();
-  auto cmd_iter = commands->find(util::ToLower(args[0]));
-  if (cmd_iter == commands->end()) {
+  auto cmd_s = Server::LookupAndCreateCommand(args[0]);
+  if (!cmd_s) {
     PushError(lua, "Unknown Redis command called from Lua script");
     return raise_error ? RaiseError(lua) : 1;
   }
+  auto cmd = *std::move(cmd_s);
 
-  auto redis_cmd = cmd_iter->second;
-  if (read_only && !(redis_cmd->flags & redis::kCmdReadOnly)) {
+  auto attributes = cmd->GetAttributes();
+  auto cmd_flags = attributes->GenerateFlags(args);
+
+  if (read_only && !(cmd_flags & redis::kCmdReadOnly)) {
     PushError(lua, "Write commands are not allowed from read-only scripts");
     return raise_error ? RaiseError(lua) : 1;
   }
 
-  auto cmd = redis_cmd->factory();
-  cmd->SetAttributes(redis_cmd);
-  cmd->SetArgs(args);
-
-  int arity = cmd->GetAttributes()->arity;
-  if (((arity > 0 && argc != arity) || (arity < 0 && argc < -arity))) {
+  if (!attributes->CheckArity(argc)) {
     PushError(lua, "Wrong number of args calling Redis command From Lua 
script");
     return raise_error ? RaiseError(lua) : 1;
   }
-  auto attributes = cmd->GetAttributes();
-  auto cmd_flags = attributes->GenerateFlags(args);
+
   if (cmd_flags & redis::kCmdNoScript) {
     PushError(lua, "This Redis command is not allowed from scripts");
     return raise_error ? RaiseError(lua) : 1;
   }
 
-  std::string cmd_name = util::ToLower(args[0]);
+  std::string cmd_name = attributes->name;
 
   auto srv = GetServer(lua);
   Config *config = srv->GetConfig();
@@ -763,23 +759,15 @@ int RedisGenericCommand(lua_State *lua, int raise_error) {
     return raise_error ? RaiseError(lua) : 1;
   }
 
-  auto s = cmd->Parse(args);
+  cmd->SetArgs(args);
+  auto s = cmd->Parse();
   if (!s) {
     PushError(lua, s.Msg().data());
     return raise_error ? RaiseError(lua) : 1;
   }
 
-  srv->stats.IncrCalls(cmd_name);
-  auto start = std::chrono::high_resolution_clock::now();
-  bool is_profiling = conn->IsProfilingEnabled(cmd_name);
   std::string output;
-  s = cmd->Execute(srv, srv->GetCurrentConnection(), &output);
-  auto end = std::chrono::high_resolution_clock::now();
-  uint64_t duration = 
std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
-  if (is_profiling) conn->RecordProfilingSampleIfNeed(cmd_name, duration);
-  srv->SlowlogPushEntryIfNeeded(&args, duration, conn);
-  srv->stats.IncrLatency(static_cast<uint64_t>(duration), cmd_name);
-  srv->FeedMonitorConns(conn, args);
+  s = conn->ExecuteCommand(cmd_name, args, cmd.get(), &output);
   if (!s) {
     PushError(lua, s.Msg().data());
     return raise_error ? RaiseError(lua) : 1;

Reply via email to