This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 6ee41908 feat(conn): move time-consuming ops out of ExecuteCommand for
context (#2611)
6ee41908 is described below
commit 6ee41908514afa95ccc8e422acb802d151a64caa
Author: Twice <[email protected]>
AuthorDate: Sat Oct 19 16:55:42 2024 +0800
feat(conn): move time-consuming ops out of ExecuteCommand for context
(#2611)
---
src/server/redis_connection.cc | 58 ++++++++++++++++++++++--------------------
src/storage/scripting.cc | 14 ++++++----
2 files changed, 40 insertions(+), 32 deletions(-)
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index e742b168..6647c308 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -358,7 +358,6 @@ Status Connection::ExecuteCommand(engine::Context &ctx,
const std::string &cmd_n
srv_->SlowlogPushEntryIfNeeded(&cmd_tokens, duration, this);
srv_->stats.IncrLatency(static_cast<uint64_t>(duration), cmd_name);
- srv_->FeedMonitorConns(this, cmd_tokens);
return s;
}
@@ -498,36 +497,41 @@ void
Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
continue;
}
- engine::Context ctx(srv_->storage);
- // TODO: transaction support for index recording
- std::vector<GlobalIndexer::RecordResult> index_records;
- if (!srv_->index_mgr.index_map.empty() && IsCmdForIndexing(attributes) &&
!config->cluster_enabled) {
- attributes->ForEachKeyRange(
- [&, this](const std::vector<std::string> &args, const
CommandKeyRange &key_range) {
- key_range.ForEachKey(
- [&, this](const std::string &key) {
- auto res = srv_->indexer.Record(ctx, key, ns_);
- if (res.IsOK()) {
- index_records.push_back(*res);
- } else if (!res.Is<Status::NoPrefixMatched>() &&
!res.Is<Status::TypeMismatched>()) {
- LOG(WARNING) << "index recording failed for key: " << key;
- }
- },
- args);
- },
- cmd_tokens);
- }
-
SetLastCmd(cmd_name);
- s = ExecuteCommand(ctx, cmd_name, cmd_tokens, current_cmd.get(), &reply);
- // TODO: transaction support for index updating
- for (const auto &record : index_records) {
- auto s = GlobalIndexer::Update(ctx, record);
- if (!s.IsOK() && !s.Is<Status::TypeMismatched>()) {
- LOG(WARNING) << "index updating failed for key: " << record.key;
+ {
+ engine::Context ctx(srv_->storage);
+
+ // TODO: transaction support for index recording
+ std::vector<GlobalIndexer::RecordResult> index_records;
+ if (!srv_->index_mgr.index_map.empty() && IsCmdForIndexing(attributes)
&& !config->cluster_enabled) {
+ attributes->ForEachKeyRange(
+ [&, this](const std::vector<std::string> &args, const
CommandKeyRange &key_range) {
+ key_range.ForEachKey(
+ [&, this](const std::string &key) {
+ auto res = srv_->indexer.Record(ctx, key, ns_);
+ if (res.IsOK()) {
+ index_records.push_back(*res);
+ } else if (!res.Is<Status::NoPrefixMatched>() &&
!res.Is<Status::TypeMismatched>()) {
+ LOG(WARNING) << "index recording failed for key: " <<
key;
+ }
+ },
+ args);
+ },
+ cmd_tokens);
+ }
+
+ s = ExecuteCommand(ctx, cmd_name, cmd_tokens, current_cmd.get(), &reply);
+ // TODO: transaction support for index updating
+ for (const auto &record : index_records) {
+ auto s = GlobalIndexer::Update(ctx, record);
+ if (!s.IsOK() && !s.Is<Status::TypeMismatched>()) {
+ LOG(WARNING) << "index updating failed for key: " << record.key;
+ }
}
}
+ srv_->FeedMonitorConns(this, cmd_tokens);
+
// Break the execution loop when occurring the blocking command like BLPOP
or BRPOP,
// it will suspend the connection and wait for the wakeup signal.
if (s.Is<Status::BlockingCmd>()) {
diff --git a/src/storage/scripting.cc b/src/storage/scripting.cc
index 6e05e05d..f38d9422 100644
--- a/src/storage/scripting.cc
+++ b/src/storage/scripting.cc
@@ -814,13 +814,17 @@ int RedisGenericCommand(lua_State *lua, int raise_error) {
std::string output;
// TODO: make it possible for multiple redis commands in lua script to use
the same txn context.
- engine::Context ctx(srv->storage);
- s = conn->ExecuteCommand(ctx, cmd_name, args, cmd.get(), &output);
- if (!s) {
- PushError(lua, s.Msg().data());
- return raise_error ? RaiseError(lua) : 1;
+ {
+ engine::Context ctx(srv->storage);
+ s = conn->ExecuteCommand(ctx, cmd_name, args, cmd.get(), &output);
+ if (!s) {
+ PushError(lua, s.Msg().data());
+ return raise_error ? RaiseError(lua) : 1;
+ }
}
+ srv->FeedMonitorConns(conn, args);
+
RedisProtocolToLuaType(lua, output.data());
return 1;
}