This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new c85daaca Add dynamic flag generator to replace adhoc logic in
ExecuteCommands (#1655)
c85daaca is described below
commit c85daaca9b44ca66c97c36f663470526549bb9e0
Author: Twice <[email protected]>
AuthorDate: Wed Aug 9 13:01:50 2023 +0800
Add dynamic flag generator to replace adhoc logic in ExecuteCommands (#1655)
We add dynamic flag generator regarding to command arguments to replace
adhoc exclusive check logic in Connection::ExecuteCommands().
And we also fix and refactor GetKeysFromCommand() in this PR.
---
src/cluster/cluster.cc | 6 ++--
src/commands/cmd_cluster.cc | 13 +++++++--
src/commands/cmd_geo.cc | 2 +-
src/commands/cmd_server.cc | 19 +++++++++++--
src/commands/commander.cc | 39 +++++++++++++++-----------
src/commands/commander.h | 62 ++++++++++++++++++++++++++++--------------
src/server/redis_connection.cc | 35 ++++++++++--------------
src/server/server.cc | 2 +-
src/storage/scripting.cc | 5 ++--
9 files changed, 116 insertions(+), 67 deletions(-)
diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index e345dd92..665aa8f9 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -742,7 +742,7 @@ bool Cluster::IsWriteForbiddenSlot(int slot) { return
svr_->slot_migrator->GetFo
Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes,
const std::vector<std::string> &cmd_tokens,
redis::Connection *conn) {
std::vector<int> keys_indexes;
- auto s = redis::GetKeysFromCommand(attributes->name,
static_cast<int>(cmd_tokens.size()), &keys_indexes);
+ auto s = redis::GetKeysFromCommand(attributes, cmd_tokens, &keys_indexes);
// No keys
if (!s.IsOK()) return Status::OK();
@@ -773,7 +773,7 @@ Status Cluster::CanExecByMySelf(const
redis::CommandAttributes *attributes, cons
}
// To keep data consistency, slot will be forbidden write while sending
the last incremental data.
// During this phase, the requests of the migrating slot has to be
rejected.
- if (attributes->IsWrite() && IsWriteForbiddenSlot(slot)) {
+ if ((attributes->flags & redis::kCmdWrite) && IsWriteForbiddenSlot(slot)) {
return {Status::RedisExecErr, "TRYAGAIN Can't write to slot being
migrated which is in write forbidden phase"};
}
@@ -795,7 +795,7 @@ Status Cluster::CanExecByMySelf(const
redis::CommandAttributes *attributes, cons
return Status::OK(); // I'm serving the imported slot
}
- if (myself_ && myself_->role == kClusterSlave && !attributes->IsWrite() &&
+ if (myself_ && myself_->role == kClusterSlave && !(attributes->flags &
redis::kCmdWrite) &&
nodes_.find(myself_->master_id) != nodes_.end() &&
nodes_[myself_->master_id] == slots_nodes_[slot]) {
return Status::OK(); // My master is serving this slot
}
diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc
index e34186ee..b7882251 100644
--- a/src/commands/cmd_cluster.cc
+++ b/src/commands/cmd_cluster.cc
@@ -284,7 +284,16 @@ class CommandClusterX : public Commander {
std::unique_ptr<SyncMigrateContext> sync_migrate_ctx_ = nullptr;
};
-REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandCluster>("cluster", -2, "cluster
no-script", 0, 0, 0),
- MakeCmdAttr<CommandClusterX>("clusterx", -2, "cluster
no-script", 0, 0, 0), )
+static uint64_t GenerateClusterFlag(const std::vector<std::string> &args) {
+ if (args.size() >= 2 && Cluster::SubCommandIsExecExclusive(args[1])) {
+ return kCmdExclusive;
+ }
+
+ return 0;
+}
+
+REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandCluster>("cluster", -2, "cluster
no-script", 0, 0, 0, GenerateClusterFlag),
+ MakeCmdAttr<CommandClusterX>("clusterx", -2, "cluster
no-script", 0, 0, 0,
+ GenerateClusterFlag), )
} // namespace redis
diff --git a/src/commands/cmd_geo.cc b/src/commands/cmd_geo.cc
index 0b732833..6795e8d4 100644
--- a/src/commands/cmd_geo.cc
+++ b/src/commands/cmd_geo.cc
@@ -275,7 +275,7 @@ class CommandGeoRadius : public CommandGeoBase {
count_ = *parse_result;
i += 2;
- } else if (attributes_->IsWrite() &&
+ } else if ((attributes_->flags & kCmdWrite) &&
(util::ToLower(args_[i]) == "store" ||
util::ToLower(args_[i]) == "storedist") &&
i + 1 < args_.size()) {
store_key_ = args_[i + 1];
diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index 8e561435..d3c6b9cf 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -26,6 +26,7 @@
#include "server/redis_connection.h"
#include "server/server.h"
#include "stats/disk_stats.h"
+#include "string_util.h"
#include "time_util.h"
namespace redis {
@@ -612,8 +613,14 @@ class CommandCommand : public Commander {
} else if (sub_command == "info") {
GetCommandsInfo(output, std::vector<std::string>(args_.begin() + 2,
args_.end()));
} else if (sub_command == "getkeys") {
+ auto cmd_iter =
command_details::original_commands.find(util::ToLower(args_[2]));
+ if (cmd_iter == command_details::original_commands.end()) {
+ return {Status::RedisUnknownCmd, "Invalid command specified"};
+ }
+
std::vector<int> keys_indexes;
- auto s = GetKeysFromCommand(args_[2], static_cast<int>(args_.size()) -
2, &keys_indexes);
+ auto s = GetKeysFromCommand(cmd_iter->second,
std::vector<std::string>(args_.begin() + 2, args_.end()),
+ &keys_indexes);
if (!s.IsOK()) return s;
if (keys_indexes.size() == 0) {
@@ -964,12 +971,20 @@ class CommandStats : public Commander {
}
};
+static uint64_t GenerateConfigFlag(const std::vector<std::string> &args) {
+ if (args.size() >= 2 && util::EqualICase(args[1], "set")) {
+ return kCmdExclusive;
+ }
+
+ return 0;
+}
+
REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandAuth>("auth", 2, "read-only
ok-loading", 0, 0, 0),
MakeCmdAttr<CommandPing>("ping", -1, "read-only", 0,
0, 0),
MakeCmdAttr<CommandSelect>("select", 2, "read-only",
0, 0, 0),
MakeCmdAttr<CommandInfo>("info", -1, "read-only
ok-loading", 0, 0, 0),
MakeCmdAttr<CommandRole>("role", 1, "read-only
ok-loading", 0, 0, 0),
- MakeCmdAttr<CommandConfig>("config", -2, "read-only",
0, 0, 0),
+ MakeCmdAttr<CommandConfig>("config", -2, "read-only",
0, 0, 0, GenerateConfigFlag),
MakeCmdAttr<CommandNamespace>("namespace", -3,
"read-only exclusive", 0, 0, 0),
MakeCmdAttr<CommandKeys>("keys", 2, "read-only", 0, 0,
0),
MakeCmdAttr<CommandFlushDB>("flushdb", 1, "write", 0,
0, 0),
diff --git a/src/commands/commander.cc b/src/commands/commander.cc
index 9c08a386..0fa8bdf0 100644
--- a/src/commands/commander.cc
+++ b/src/commands/commander.cc
@@ -46,7 +46,7 @@ std::string GetCommandInfo(const CommandAttributes
*command_attributes) {
command.append(redis::BulkString(command_attributes->name));
command.append(redis::Integer(command_attributes->arity));
command_flags.append(redis::MultiLen(1));
- command_flags.append(redis::BulkString(command_attributes->IsWrite() ?
"write" : "readonly"));
+ command_flags.append(redis::BulkString(command_attributes->flags & kCmdWrite
? "write" : "readonly"));
command.append(command_flags);
command.append(redis::Integer(command_attributes->key_range.first_key));
command.append(redis::Integer(command_attributes->key_range.last_key));
@@ -77,30 +77,37 @@ void GetCommandsInfo(std::string *info, const
std::vector<std::string> &cmd_name
}
}
-Status GetKeysFromCommand(const std::string &cmd_name, int argc,
std::vector<int> *keys_indexes) {
- auto cmd_iter =
command_details::original_commands.find(util::ToLower(cmd_name));
- if (cmd_iter == command_details::original_commands.end()) {
- return {Status::RedisUnknownCmd, "Invalid command specified"};
+void DumpKeyRange(std::vector<int> &keys_index, int argc, const
CommandKeyRange &range) {
+ auto last = range.last_key;
+ if (last < 0) last = argc + last;
+
+ for (int i = range.first_key; i <= last; i += range.key_step) {
+ keys_index.emplace_back(i);
}
+}
- auto command_attribute = cmd_iter->second;
- if (command_attribute->key_range.first_key == 0) {
+Status GetKeysFromCommand(const CommandAttributes *attributes, const
std::vector<std::string> &cmd_tokens,
+ std::vector<int> *keys_index) {
+ if (attributes->key_range.first_key == 0) {
return {Status::NotOK, "The command has no key arguments"};
}
- if (command_attribute->key_range.first_key < 0) {
- return {Status::NotOK, "The command has dynamic positions of key
arguments"};
- }
+ int argc = static_cast<int>(cmd_tokens.size());
- if ((command_attribute->arity > 0 && command_attribute->arity != argc) ||
argc < -command_attribute->arity) {
+ if ((attributes->arity > 0 && attributes->arity != argc) || argc <
-attributes->arity) {
return {Status::NotOK, "Invalid number of arguments specified for
command"};
}
- auto last = command_attribute->key_range.last_key;
- if (last < 0) last = argc + last;
-
- for (int j = command_attribute->key_range.first_key; j <= last; j +=
command_attribute->key_range.key_step) {
- keys_indexes->emplace_back(j);
+ if (attributes->key_range.first_key > 0) {
+ DumpKeyRange(*keys_index, argc, attributes->key_range);
+ } else if (attributes->key_range.first_key == -1) {
+ DumpKeyRange(*keys_index, argc, attributes->key_range_gen(cmd_tokens));
+ } else if (attributes->key_range.first_key == -2) {
+ for (const auto &range : attributes->key_range_vec_gen(cmd_tokens)) {
+ DumpKeyRange(*keys_index, argc, range);
+ }
+ } else {
+ return {Status::NotOK, "The key range specification is invalid"};
}
return Status::OK();
diff --git a/src/commands/commander.h b/src/commands/commander.h
index 874957a4..0bf6448d 100644
--- a/src/commands/commander.h
+++ b/src/commands/commander.h
@@ -50,7 +50,7 @@ namespace redis {
class Connection;
struct CommandAttributes;
-enum CommandFlags {
+enum CommandFlags : uint64_t {
kCmdWrite = 1ULL << 0, // "write" flag
kCmdReadOnly = 1ULL << 1, // "read-only" flag
kCmdReplication = 1ULL << 2, // "replication" flag
@@ -115,7 +115,10 @@ using CommandKeyRangeGen =
std::function<CommandKeyRange(const std::vector<std::
using CommandKeyRangeVecGen = std::function<std::vector<CommandKeyRange>(const
std::vector<std::string> &)>;
+using AdditionalFlagGen = std::function<uint64_t(const
std::vector<std::string> &)>;
+
struct CommandAttributes {
+ // command name
std::string name;
// number of command arguments
@@ -123,9 +126,16 @@ struct CommandAttributes {
// negative number -n means number of arguments is equal to or large than n
int arity;
+ // space-splitted flag strings to initialize flags
std::string description;
+
+ // bitmap of enum CommandFlags
uint64_t flags;
+ // additional flags regarding to dynamic command arguments
+ AdditionalFlagGen flag_gen;
+
+ // static determined key range
CommandKeyRange key_range;
// if key_range.first_key == -1, key_range_gen is used instead
@@ -134,13 +144,14 @@ struct CommandAttributes {
// if key_range.first_key == -2, key_range_vec_gen is used instead
CommandKeyRangeVecGen key_range_vec_gen;
+ // commander object generator
CommanderFactory factory;
- bool IsWrite() const { return (flags & kCmdWrite) != 0; }
- bool IsOkLoading() const { return (flags & kCmdLoading) != 0; }
- bool IsExclusive() const { return (flags & kCmdExclusive) != 0; }
- bool IsMulti() const { return (flags & kCmdMulti) != 0; }
- bool IsNoMulti() const { return (flags & kCmdNoMulti) != 0; }
+ auto GenerateFlags(const std::vector<std::string> &args) const {
+ uint64_t res = flags;
+ if (flag_gen) res |= flag_gen(args);
+ return res;
+ }
};
using CommandMap = std::map<std::string, const CommandAttributes *>;
@@ -184,11 +195,12 @@ inline uint64_t ParseCommandFlags(const std::string
&description, const std::str
template <typename T>
auto MakeCmdAttr(const std::string &name, int arity, const std::string
&description, int first_key, int last_key,
- int key_step) {
+ int key_step, const AdditionalFlagGen &flag_gen = {}) {
CommandAttributes attr{name,
arity,
description,
ParseCommandFlags(description, name),
+ flag_gen,
{first_key, last_key, key_step},
{},
{},
@@ -203,24 +215,33 @@ auto MakeCmdAttr(const std::string &name, int arity,
const std::string &descript
}
template <typename T>
-auto MakeCmdAttr(const std::string &name, int arity, const std::string
&description, const CommandKeyRangeGen &gen) {
- CommandAttributes attr{
- name, arity,
- description, ParseCommandFlags(description, name),
- {-1, 0, 0}, gen,
- {}, []() -> std::unique_ptr<Commander> { return
std::unique_ptr<Commander>(new T()); }};
+auto MakeCmdAttr(const std::string &name, int arity, const std::string
&description, const CommandKeyRangeGen &gen,
+ const AdditionalFlagGen &flag_gen = {}) {
+ CommandAttributes attr{name,
+ arity,
+ description,
+ ParseCommandFlags(description, name),
+ flag_gen,
+ {-1, 0, 0},
+ gen,
+ {},
+ []() -> std::unique_ptr<Commander> { return
std::unique_ptr<Commander>(new T()); }};
return attr;
}
template <typename T>
auto MakeCmdAttr(const std::string &name, int arity, const std::string
&description,
- const CommandKeyRangeVecGen &vec_gen) {
- CommandAttributes attr{
- name, arity,
- description, ParseCommandFlags(description, name),
- {-2, 0, 0}, {},
- vec_gen, []() -> std::unique_ptr<Commander> { return
std::unique_ptr<Commander>(new T()); }};
+ const CommandKeyRangeVecGen &vec_gen, const AdditionalFlagGen
&flag_gen = {}) {
+ CommandAttributes attr{name,
+ arity,
+ description,
+ ParseCommandFlags(description, name),
+ flag_gen,
+ {-2, 0, 0},
+ {},
+ vec_gen,
+ []() -> std::unique_ptr<Commander> { return
std::unique_ptr<Commander>(new T()); }};
return attr;
}
@@ -254,7 +275,8 @@ const CommandMap *GetOriginalCommands();
void GetAllCommandsInfo(std::string *info);
void GetCommandsInfo(std::string *info, const std::vector<std::string>
&cmd_names);
std::string GetCommandInfo(const CommandAttributes *command_attributes);
-Status GetKeysFromCommand(const std::string &name, int argc, std::vector<int>
*keys_indexes);
+Status GetKeysFromCommand(const CommandAttributes *attributes, const
std::vector<std::string> &cmd_tokens,
+ std::vector<int> *keys_indexes);
bool IsCommandExists(const std::string &name);
} // namespace redis
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index 70f199d8..a83b4d59 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -290,13 +290,6 @@ void Connection::RecordProfilingSampleIfNeed(const
std::string &cmd, uint64_t du
svr_->GetPerfLog()->PushEntry(std::move(entry));
}
-bool IsSpecialExclusiveCommand(const std::string &cmd_name, const
std::vector<std::string> &cmd_tokens,
- Config *config) {
- return (cmd_name == "config" && cmd_tokens.size() == 2 &&
util::EqualICase(cmd_tokens[1], "set")) ||
- (config->cluster_enabled && (cmd_name == "clusterx" || cmd_name ==
"cluster") && cmd_tokens.size() >= 2 &&
- Cluster::SubCommandIsExecExclusive(cmd_tokens[1]));
-}
-
void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
Config *config = svr_->GetConfig();
std::string reply, password = config->requirepass;
@@ -305,11 +298,12 @@ void
Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
auto cmd_tokens = to_process_cmds->front();
to_process_cmds->pop_front();
- if (IsFlagEnabled(redis::Connection::kCloseAfterReply) &&
!IsFlagEnabled(Connection::kMultiExec)) break;
+ bool is_multi_exec = IsFlagEnabled(Connection::kMultiExec);
+ if (IsFlagEnabled(redis::Connection::kCloseAfterReply) && !is_multi_exec)
break;
auto s = svr_->LookupAndCreateCommand(cmd_tokens.front(), ¤t_cmd);
if (!s.IsOK()) {
- if (IsFlagEnabled(Connection::kMultiExec)) multi_error_ = true;
+ if (is_multi_exec) multi_error_ = true;
Reply(redis::Error("ERR unknown command " + cmd_tokens.front()));
continue;
}
@@ -329,6 +323,7 @@ void Connection::ExecuteCommands(std::deque<CommandTokens>
*to_process_cmds) {
const auto attributes = current_cmd->GetAttributes();
auto cmd_name = attributes->name;
+ auto cmd_flags = attributes->GenerateFlags(cmd_tokens);
std::shared_lock<std::shared_mutex> concurrency; // Allow concurrency
std::unique_lock<std::shared_mutex> exclusivity; // Need exclusivity
@@ -336,9 +331,9 @@ void Connection::ExecuteCommands(std::deque<CommandTokens>
*to_process_cmds) {
// that can guarantee other threads can't come into critical zone, such as
DEBUG,
// CLUSTER subcommand, CONFIG SET, MULTI, LUA (in the immediate future).
// Otherwise, we just use 'ConcurrencyGuard' to allow all workers to
execute commands at the same time.
- if (IsFlagEnabled(Connection::kMultiExec) && attributes->name != "exec") {
+ if (is_multi_exec && attributes->name != "exec") {
// No lock guard, because 'exec' command has acquired
'WorkExclusivityGuard'
- } else if (attributes->IsExclusive() ||
IsSpecialExclusiveCommand(cmd_name, cmd_tokens, config)) {
+ } else if (cmd_flags & kCmdExclusive) {
exclusivity = svr_->WorkExclusivityGuard();
// When executing lua script commands that have "exclusive" attribute,
we need to know current connection,
@@ -348,21 +343,21 @@ void
Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
concurrency = svr_->WorkConcurrencyGuard();
}
- if (attributes->flags & kCmdROScript) {
+ if (cmd_flags & kCmdROScript) {
// if executing read only lua script commands, set current connection.
svr_->SetCurrentConnection(this);
}
- if (svr_->IsLoading() && !attributes->IsOkLoading()) {
+ if (svr_->IsLoading() && !(cmd_flags & kCmdLoading)) {
Reply(redis::Error("LOADING kvrocks is restoring the db from backup"));
- if (IsFlagEnabled(Connection::kMultiExec)) multi_error_ = true;
+ if (is_multi_exec) multi_error_ = true;
continue;
}
int arity = attributes->arity;
int tokens = static_cast<int>(cmd_tokens.size());
if ((arity > 0 && tokens != arity) || (arity < 0 && tokens < -arity)) {
- if (IsFlagEnabled(Connection::kMultiExec)) multi_error_ = true;
+ if (is_multi_exec) multi_error_ = true;
Reply(redis::Error("ERR wrong number of arguments"));
continue;
}
@@ -370,12 +365,12 @@ void
Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
current_cmd->SetArgs(cmd_tokens);
s = current_cmd->Parse();
if (!s.IsOK()) {
- if (IsFlagEnabled(Connection::kMultiExec)) multi_error_ = true;
+ if (is_multi_exec) multi_error_ = true;
Reply(redis::Error("ERR " + s.Msg()));
continue;
}
- if (IsFlagEnabled(Connection::kMultiExec) && attributes->IsNoMulti()) {
+ if (is_multi_exec && (cmd_flags & kCmdNoMulti)) {
std::string no_multi_err = "ERR Can't execute " + attributes->name + "
in MULTI";
Reply(redis::Error(no_multi_err));
multi_error_ = true;
@@ -385,20 +380,20 @@ void
Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
if (config->cluster_enabled) {
s = svr_->cluster->CanExecByMySelf(attributes, cmd_tokens, this);
if (!s.IsOK()) {
- if (IsFlagEnabled(Connection::kMultiExec)) multi_error_ = true;
+ if (is_multi_exec) multi_error_ = true;
Reply(redis::Error("ERR " + s.Msg()));
continue;
}
}
// We don't execute commands, but queue them, ant then execute in EXEC
command
- if (IsFlagEnabled(Connection::kMultiExec) && !in_exec_ &&
!attributes->IsMulti()) {
+ if (is_multi_exec && !in_exec_ && !(cmd_flags & kCmdMulti)) {
multi_cmds_.emplace_back(cmd_tokens);
Reply(redis::SimpleString("QUEUED"));
continue;
}
- if (config->slave_readonly && svr_->IsSlave() && attributes->IsWrite()) {
+ if (config->slave_readonly && svr_->IsSlave() && (cmd_flags & kCmdWrite)) {
Reply(redis::Error("READONLY You can't write against a read only
slave."));
continue;
}
diff --git a/src/server/server.cc b/src/server/server.cc
index 23719b3f..04e349b1 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -1677,7 +1677,7 @@ void Server::updateAllWatchedKeys() {
}
void Server::UpdateWatchedKeysFromArgs(const std::vector<std::string> &args,
const redis::CommandAttributes &attr) {
- if (attr.IsWrite() && watched_key_size_ > 0) {
+ if ((attr.flags & redis::kCmdWrite) && watched_key_size_ > 0) {
if (attr.key_range.first_key > 0) {
updateWatchedKeysFromRange(args, attr.key_range);
} else if (attr.key_range.first_key == -1) {
diff --git a/src/storage/scripting.cc b/src/storage/scripting.cc
index dc8bd262..3f787f47 100644
--- a/src/storage/scripting.cc
+++ b/src/storage/scripting.cc
@@ -356,7 +356,8 @@ int RedisGenericCommand(lua_State *lua, int raise_error) {
return raise_error ? RaiseError(lua) : 1;
}
auto attributes = cmd->GetAttributes();
- if (attributes->flags & redis::kCmdNoScript) {
+ auto cmd_flags = attributes->GenerateFlags(args);
+ if (cmd_flags & redis::kCmdNoScript) {
PushError(lua, "This Redis command is not allowed from scripts");
return raise_error ? RaiseError(lua) : 1;
}
@@ -378,7 +379,7 @@ int RedisGenericCommand(lua_State *lua, int raise_error) {
}
}
- if (config->slave_readonly && srv->IsSlave() && attributes->IsWrite()) {
+ if (config->slave_readonly && srv->IsSlave() && (cmd_flags &
redis::kCmdWrite)) {
PushError(lua, "READONLY You can't write against a read only slave.");
return raise_error ? RaiseError(lua) : 1;
}