This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 58d177fc Unify the workflow of command executing in connection and
scripting (#2095)
58d177fc is described below
commit 58d177fc1d78198a3bfa21f53db8c6a5031dc380
Author: Twice <[email protected]>
AuthorDate: Sat Feb 10 18:46:54 2024 +0900
Unify the workflow of command executing in connection and scripting (#2095)
---
src/commands/commander.h | 4 ++++
src/server/redis_connection.cc | 41 +++++++++++++++++++++++------------------
src/server/redis_connection.h | 2 ++
src/server/server.cc | 12 ++++++------
src/server/server.h | 2 +-
src/storage/scripting.cc | 38 +++++++++++++-------------------------
6 files changed, 49 insertions(+), 50 deletions(-)
diff --git a/src/commands/commander.h b/src/commands/commander.h
index 85cdde76..d759bd20 100644
--- a/src/commands/commander.h
+++ b/src/commands/commander.h
@@ -149,6 +149,10 @@ struct CommandAttributes {
if (flag_gen) res |= flag_gen(args);
return res;
}
+
+ bool CheckArity(int cmd_size) const {
+ return !((arity > 0 && cmd_size != arity) || (arity < 0 && cmd_size <
-arity));
+ }
};
using CommandMap = std::map<std::string, const CommandAttributes *>;
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index aa830ac8..80904507 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -391,6 +391,23 @@ void Connection::RecordProfilingSampleIfNeed(const
std::string &cmd, uint64_t du
srv_->GetPerfLog()->PushEntry(std::move(entry));
}
+Status Connection::ExecuteCommand(const std::string &cmd_name, const
std::vector<std::string> &cmd_tokens,
+ Commander *current_cmd, std::string *reply) {
+ srv_->stats.IncrCalls(cmd_name);
+
+ auto start = std::chrono::high_resolution_clock::now();
+ bool is_profiling = IsProfilingEnabled(cmd_name);
+ auto s = current_cmd->Execute(srv_, this, reply);
+ auto end = std::chrono::high_resolution_clock::now();
+ uint64_t duration =
std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
+ if (is_profiling) RecordProfilingSampleIfNeed(cmd_name, duration);
+
+ srv_->SlowlogPushEntryIfNeeded(&cmd_tokens, duration, this);
+ srv_->stats.IncrLatency(static_cast<uint64_t>(duration), cmd_name);
+ srv_->FeedMonitorConns(this, cmd_tokens);
+ return s;
+}
+
void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
Config *config = srv_->GetConfig();
std::string reply, password = config->requirepass;
@@ -403,13 +420,13 @@ void
Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
bool is_multi_exec = IsFlagEnabled(Connection::kMultiExec);
if (IsFlagEnabled(redis::Connection::kCloseAfterReply) && !is_multi_exec)
break;
- std::unique_ptr<Commander> current_cmd;
- auto s = srv_->LookupAndCreateCommand(cmd_tokens.front(), ¤t_cmd);
- if (!s.IsOK()) {
+ auto cmd_s = srv_->LookupAndCreateCommand(cmd_tokens.front());
+ if (!cmd_s.IsOK()) {
if (is_multi_exec) multi_error_ = true;
Reply(redis::Error("ERR unknown command " + cmd_tokens.front()));
continue;
}
+ auto current_cmd = std::move(*cmd_s);
if (GetNamespace().empty()) {
if (!password.empty() && util::ToLower(cmd_tokens.front()) != "auth" &&
@@ -457,16 +474,15 @@ void
Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
continue;
}
- int arity = attributes->arity;
int tokens = static_cast<int>(cmd_tokens.size());
- if ((arity > 0 && tokens != arity) || (arity < 0 && tokens < -arity)) {
+ if (!attributes->CheckArity(tokens)) {
if (is_multi_exec) multi_error_ = true;
Reply(redis::Error("ERR wrong number of arguments"));
continue;
}
current_cmd->SetArgs(cmd_tokens);
- s = current_cmd->Parse();
+ auto s = current_cmd->Parse();
if (!s.IsOK()) {
if (is_multi_exec) multi_error_ = true;
Reply(redis::Error("ERR " + s.Msg()));
@@ -515,18 +531,7 @@ void Connection::ExecuteCommands(std::deque<CommandTokens>
*to_process_cmds) {
}
SetLastCmd(cmd_name);
- srv_->stats.IncrCalls(cmd_name);
-
- auto start = std::chrono::high_resolution_clock::now();
- bool is_profiling = IsProfilingEnabled(cmd_name);
- s = current_cmd->Execute(srv_, this, &reply);
- auto end = std::chrono::high_resolution_clock::now();
- uint64_t duration =
std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
- if (is_profiling) RecordProfilingSampleIfNeed(cmd_name, duration);
-
- srv_->SlowlogPushEntryIfNeeded(&cmd_tokens, duration, this);
- srv_->stats.IncrLatency(static_cast<uint64_t>(duration), cmd_name);
- srv_->FeedMonitorConns(this, cmd_tokens);
+ s = ExecuteCommand(cmd_name, cmd_tokens, current_cmd.get(), &reply);
// Break the execution loop when occurring the blocking command like BLPOP
or BRPOP,
// it will suspend the connection and wait for the wakeup signal.
diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h
index c90a349a..2211e2e1 100644
--- a/src/server/redis_connection.h
+++ b/src/server/redis_connection.h
@@ -148,6 +148,8 @@ class Connection : public EvbufCallbackBase<Connection> {
evbuffer *Output() { return bufferevent_get_output(bev_); }
bufferevent *GetBufferEvent() { return bev_; }
void ExecuteCommands(std::deque<CommandTokens> *to_process_cmds);
+ Status ExecuteCommand(const std::string &cmd_name, const
std::vector<std::string> &cmd_tokens, Commander *current_cmd,
+ std::string *reply);
bool IsProfilingEnabled(const std::string &cmd);
void RecordProfilingSampleIfNeed(const std::string &cmd, uint64_t duration);
void SetImporting() { importing_ = true; }
diff --git a/src/server/server.cc b/src/server/server.cc
index 4d67fc64..cd6b4acd 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -679,7 +679,7 @@ void Server::OnEntryAddedToStream(const std::string &ns,
const std::string &key,
void Server::updateCachedTime() { unix_time.store(util::GetTimeStamp()); }
int Server::IncrClientNum() {
- total_clients_.fetch_add(1, std::memory_order::memory_order_relaxed);
+ total_clients_.fetch_add(1, std::memory_order_relaxed);
return connected_clients_.fetch_add(1, std::memory_order_relaxed);
}
@@ -1597,7 +1597,7 @@ ReplState Server::GetReplicationState() {
return kReplConnecting;
}
-Status Server::LookupAndCreateCommand(const std::string &cmd_name,
std::unique_ptr<redis::Commander> *cmd) {
+StatusOr<std::unique_ptr<redis::Commander>>
Server::LookupAndCreateCommand(const std::string &cmd_name) {
if (cmd_name.empty()) return {Status::RedisUnknownCmd};
auto commands = redis::CommandTable::Get();
@@ -1606,11 +1606,11 @@ Status Server::LookupAndCreateCommand(const std::string
&cmd_name, std::unique_p
return {Status::RedisUnknownCmd};
}
- auto redis_cmd = cmd_iter->second;
- *cmd = redis_cmd->factory();
- (*cmd)->SetAttributes(redis_cmd);
+ auto cmd_attr = cmd_iter->second;
+ auto cmd = cmd_attr->factory();
+ cmd->SetAttributes(cmd_attr);
- return Status::OK();
+ return cmd;
}
Status Server::ScriptExists(const std::string &sha) {
diff --git a/src/server/server.h b/src/server/server.h
index a86eedf1..4f8fe314 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -177,7 +177,7 @@ class Server {
bool IsStopped() const { return stop_; }
bool IsLoading() const { return is_loading_; }
Config *GetConfig() { return config_; }
- static Status LookupAndCreateCommand(const std::string &cmd_name,
std::unique_ptr<redis::Commander> *cmd);
+ static StatusOr<std::unique_ptr<redis::Commander>>
LookupAndCreateCommand(const std::string &cmd_name);
void AdjustOpenFilesLimit();
void AdjustWorkerThreads();
diff --git a/src/storage/scripting.cc b/src/storage/scripting.cc
index 8105d321..e4c6d1fb 100644
--- a/src/storage/scripting.cc
+++ b/src/storage/scripting.cc
@@ -707,36 +707,32 @@ int RedisGenericCommand(lua_State *lua, int raise_error) {
}
}
- auto commands = redis::CommandTable::Get();
- auto cmd_iter = commands->find(util::ToLower(args[0]));
- if (cmd_iter == commands->end()) {
+ auto cmd_s = Server::LookupAndCreateCommand(args[0]);
+ if (!cmd_s) {
PushError(lua, "Unknown Redis command called from Lua script");
return raise_error ? RaiseError(lua) : 1;
}
+ auto cmd = *std::move(cmd_s);
- auto redis_cmd = cmd_iter->second;
- if (read_only && !(redis_cmd->flags & redis::kCmdReadOnly)) {
+ auto attributes = cmd->GetAttributes();
+ auto cmd_flags = attributes->GenerateFlags(args);
+
+ if (read_only && !(cmd_flags & redis::kCmdReadOnly)) {
PushError(lua, "Write commands are not allowed from read-only scripts");
return raise_error ? RaiseError(lua) : 1;
}
- auto cmd = redis_cmd->factory();
- cmd->SetAttributes(redis_cmd);
- cmd->SetArgs(args);
-
- int arity = cmd->GetAttributes()->arity;
- if (((arity > 0 && argc != arity) || (arity < 0 && argc < -arity))) {
+ if (!attributes->CheckArity(argc)) {
PushError(lua, "Wrong number of args calling Redis command From Lua
script");
return raise_error ? RaiseError(lua) : 1;
}
- auto attributes = cmd->GetAttributes();
- auto cmd_flags = attributes->GenerateFlags(args);
+
if (cmd_flags & redis::kCmdNoScript) {
PushError(lua, "This Redis command is not allowed from scripts");
return raise_error ? RaiseError(lua) : 1;
}
- std::string cmd_name = util::ToLower(args[0]);
+ std::string cmd_name = attributes->name;
auto srv = GetServer(lua);
Config *config = srv->GetConfig();
@@ -763,23 +759,15 @@ int RedisGenericCommand(lua_State *lua, int raise_error) {
return raise_error ? RaiseError(lua) : 1;
}
- auto s = cmd->Parse(args);
+ cmd->SetArgs(args);
+ auto s = cmd->Parse();
if (!s) {
PushError(lua, s.Msg().data());
return raise_error ? RaiseError(lua) : 1;
}
- srv->stats.IncrCalls(cmd_name);
- auto start = std::chrono::high_resolution_clock::now();
- bool is_profiling = conn->IsProfilingEnabled(cmd_name);
std::string output;
- s = cmd->Execute(srv, srv->GetCurrentConnection(), &output);
- auto end = std::chrono::high_resolution_clock::now();
- uint64_t duration =
std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
- if (is_profiling) conn->RecordProfilingSampleIfNeed(cmd_name, duration);
- srv->SlowlogPushEntryIfNeeded(&args, duration, conn);
- srv->stats.IncrLatency(static_cast<uint64_t>(duration), cmd_name);
- srv->FeedMonitorConns(conn, args);
+ s = conn->ExecuteCommand(cmd_name, args, cmd.get(), &output);
if (!s) {
PushError(lua, s.Msg().data());
return raise_error ? RaiseError(lua) : 1;