This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new c1009a98 feat(script, function): remove global lock for EVAL, EVALSHA
and FCALL (#2731)
c1009a98 is described below
commit c1009a98f1dd640cffd3e96a11690589ab593535
Author: Twice <[email protected]>
AuthorDate: Sat Jan 25 10:01:18 2025 +0800
feat(script, function): remove global lock for EVAL, EVALSHA and FCALL
(#2731)
---
src/commands/cmd_function.cc | 6 +++---
src/commands/cmd_script.cc | 7 +++----
src/server/server.cc | 19 ++++++++-----------
src/server/server.h | 5 +----
src/server/worker.cc | 7 +++++++
src/server/worker.h | 5 ++++-
src/storage/scripting.cc | 20 ++++++++++----------
src/storage/scripting.h | 4 ++--
tests/gocase/unit/scripting/scripting_test.go | 6 ++++--
9 files changed, 42 insertions(+), 37 deletions(-)
diff --git a/src/commands/cmd_function.cc b/src/commands/cmd_function.cc
index c229cdaa..3854b2b0 100644
--- a/src/commands/cmd_function.cc
+++ b/src/commands/cmd_function.cc
@@ -66,13 +66,13 @@ struct CommandFunction : Commander {
} else if (parser.EatEqICase("listlib")) {
auto libname = GET_OR_RET(parser.TakeStr().Prefixed("expect a library
name"));
- return lua::FunctionListLib(srv, conn, libname, output);
+ return lua::FunctionListLib(conn, libname, output);
} else if (parser.EatEqICase("delete")) {
auto libname = GET_OR_RET(parser.TakeStr());
if (!lua::FunctionIsLibExist(conn, libname)) {
return {Status::NotOK, "no such library"};
}
- auto s = lua::FunctionDelete(ctx, srv, libname);
+ auto s = lua::FunctionDelete(ctx, conn, libname);
if (!s) return s;
*output = RESP_OK;
@@ -112,7 +112,7 @@ uint64_t GenerateFunctionFlags(uint64_t flags, const
std::vector<std::string> &a
REDIS_REGISTER_COMMANDS(Function,
MakeCmdAttr<CommandFunction>("function", -2,
"exclusive no-script", NO_KEY,
GenerateFunctionFlags),
- MakeCmdAttr<CommandFCall<>>("fcall", -3, "exclusive
write no-script", GetScriptEvalKeyRange),
+ MakeCmdAttr<CommandFCall<>>("fcall", -3, "write
no-script", GetScriptEvalKeyRange),
MakeCmdAttr<CommandFCall<true>>("fcall_ro", -3,
"read-only no-script", GetScriptEvalKeyRange));
} // namespace redis
diff --git a/src/commands/cmd_script.cc b/src/commands/cmd_script.cc
index b4cf3539..3030a81e 100644
--- a/src/commands/cmd_script.cc
+++ b/src/commands/cmd_script.cc
@@ -95,7 +95,7 @@ class CommandScript : public Commander {
}
} else if (args_.size() == 3 && subcommand_ == "load") {
std::string sha;
- auto s = lua::CreateFunction(srv, args_[2], &sha, srv->Lua(), true);
+ auto s = lua::CreateFunction(srv, args_[2], &sha, conn->Owner()->Lua(),
true);
if (!s.IsOK()) {
return s;
}
@@ -125,9 +125,8 @@ uint64_t GenerateScriptFlags(uint64_t flags, const
std::vector<std::string> &arg
return flags;
}
-REDIS_REGISTER_COMMANDS(Script,
- MakeCmdAttr<CommandEval>("eval", -3, "exclusive write
no-script", GetScriptEvalKeyRange),
- MakeCmdAttr<CommandEvalSHA>("evalsha", -3, "exclusive
write no-script", GetScriptEvalKeyRange),
+REDIS_REGISTER_COMMANDS(Script, MakeCmdAttr<CommandEval>("eval", -3, "write
no-script", GetScriptEvalKeyRange),
+ MakeCmdAttr<CommandEvalSHA>("evalsha", -3, "write
no-script", GetScriptEvalKeyRange),
MakeCmdAttr<CommandEvalRO>("eval_ro", -3, "read-only
no-script", GetScriptEvalKeyRange),
MakeCmdAttr<CommandEvalSHARO>("evalsha_ro", -3,
"read-only no-script", GetScriptEvalKeyRange),
MakeCmdAttr<CommandScript>("script", -2, "exclusive
no-script", NO_KEY), )
diff --git a/src/server/server.cc b/src/server/server.cc
index e809444b..0c15abef 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -113,7 +113,6 @@ Server::Server(engine::Storage *storage, Config *config)
AdjustOpenFilesLimit();
slow_log_.SetMaxEntries(config->slowlog_max_len);
perf_log_.SetMaxEntries(config->profiling_sample_record_max_len);
- lua_ = lua::CreateState();
}
Server::~Server() {
@@ -134,8 +133,6 @@ Server::~Server() {
}
cleanupExitedWorkerThreads(true /* force */);
CleanupExitedSlaves();
-
- lua::DestroyState(lua_);
}
// Kvrocks threads list:
@@ -1013,7 +1010,10 @@ void Server::GetClientsInfo(std::string *info) {
void Server::GetMemoryInfo(std::string *info) {
int64_t rss = Stats::GetMemoryRSS();
- int memory_lua = lua_gc(lua_, LUA_GCCOUNT, 0) * 1024;
+ int64_t memory_lua = 0;
+ for (auto &wt : worker_threads_) {
+ memory_lua += wt->GetWorker()->GetLuaMemorySize();
+ }
std::string used_memory_rss_human = util::BytesToHuman(rss);
std::string used_memory_lua_human = util::BytesToHuman(memory_lua);
@@ -1733,11 +1733,7 @@ StatusOr<std::unique_ptr<redis::Commander>>
Server::LookupAndCreateCommand(const
return std::move(cmd);
}
-Status Server::ScriptExists(const std::string &sha) {
- if (lua::ScriptExists(lua_, sha)) {
- return Status::OK();
- }
-
+Status Server::ScriptExists(const std::string &sha) const {
std::string body;
return ScriptGet(sha, &body);
}
@@ -1794,8 +1790,9 @@ Status Server::FunctionSetLib(const std::string &func,
const std::string &lib) c
}
void Server::ScriptReset() {
- auto lua = lua_.exchange(lua::CreateState());
- lua::DestroyState(lua);
+ for (auto &wt : worker_threads_) {
+ wt->GetWorker()->LuaReset();
+ }
}
Status Server::ScriptFlush() {
diff --git a/src/server/server.h b/src/server/server.h
index 3c2ab16e..55059d97 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -269,8 +269,7 @@ class Server {
void KillClient(int64_t *killed, const std::string &addr, uint64_t id,
uint64_t type, bool skipme,
redis::Connection *conn);
- lua_State *Lua() { return lua_; }
- Status ScriptExists(const std::string &sha);
+ Status ScriptExists(const std::string &sha) const;
Status ScriptGet(const std::string &sha, std::string *body) const;
Status ScriptSet(const std::string &sha, const std::string &body) const;
void ScriptReset();
@@ -338,8 +337,6 @@ class Server {
std::string last_random_key_cursor_;
std::mutex last_random_key_cursor_mu_;
- std::atomic<lua_State *> lua_;
-
// client counters
std::atomic<uint64_t> client_id_{1};
std::atomic<int> connected_clients_{0};
diff --git a/src/server/worker.cc b/src/server/worker.cc
index 6420d76c..3a5642c3 100644
--- a/src/server/worker.cc
+++ b/src/server/worker.cc
@@ -546,6 +546,13 @@ void Worker::KillClient(redis::Connection *self, uint64_t
id, const std::string
}
}
+void Worker::LuaReset() {
+ auto lua = lua_.exchange(lua::CreateState());
+ lua::DestroyState(lua);
+}
+
+int64_t Worker::GetLuaMemorySize() { return (int64_t)lua_gc(lua_, LUA_GCCOUNT,
0) * 1024; }
+
void Worker::KickoutIdleClients(int timeout) {
std::vector<std::pair<int, uint64_t>> to_be_killed_conns;
diff --git a/src/server/worker.h b/src/server/worker.h
index ac88ec8a..bf691840 100644
--- a/src/server/worker.h
+++ b/src/server/worker.h
@@ -74,6 +74,9 @@ class Worker : EventCallbackBase<Worker>,
EvconnlistenerBase<Worker> {
void TimerCB(int, int16_t events);
lua_State *Lua() { return lua_; }
+ void LuaReset();
+ int64_t GetLuaMemorySize();
+
std::map<int, redis::Connection *> GetConnections() const { return conns_; }
Server *srv;
@@ -95,7 +98,7 @@ class Worker : EventCallbackBase<Worker>,
EvconnlistenerBase<Worker> {
struct bufferevent_rate_limit_group *rate_limit_group_ = nullptr;
struct ev_token_bucket_cfg *rate_limit_group_cfg_ = nullptr;
- lua_State *lua_;
+ std::atomic<lua_State *> lua_;
std::atomic<bool> is_terminated_ = false;
};
diff --git a/src/storage/scripting.cc b/src/storage/scripting.cc
index 9676b439..5d9c8860 100644
--- a/src/storage/scripting.cc
+++ b/src/storage/scripting.cc
@@ -294,14 +294,14 @@ Status FunctionLoad(redis::Connection *conn, const
std::string &script, bool nee
const auto libname = GET_OR_RET(ExtractLibNameFromShebang(first_line));
auto srv = conn->GetServer();
- auto lua = read_only ? conn->Owner()->Lua() : srv->Lua();
+ auto lua = conn->Owner()->Lua();
if (FunctionIsLibExist(conn, libname, need_to_store, read_only)) {
if (!replace) {
return {Status::NotOK, "library already exists, please specify REPLACE
to force load"};
}
engine::Context ctx(srv->storage);
- auto s = FunctionDelete(ctx, srv, libname);
+ auto s = FunctionDelete(ctx, conn, libname);
if (!s) return s;
}
@@ -348,7 +348,7 @@ Status FunctionLoad(redis::Connection *conn, const
std::string &script, bool nee
bool FunctionIsLibExist(redis::Connection *conn, const std::string &libname,
bool need_check_storage, bool read_only) {
auto srv = conn->GetServer();
- auto lua = read_only ? conn->Owner()->Lua() : srv->Lua();
+ auto lua = conn->Owner()->Lua();
lua_getglobal(lua, REDIS_FUNCTION_LIBRARIES);
@@ -382,7 +382,7 @@ bool FunctionIsLibExist(redis::Connection *conn, const
std::string &libname, boo
Status FunctionCall(redis::Connection *conn, const std::string &name, const
std::vector<std::string> &keys,
const std::vector<std::string> &argv, std::string *output,
bool read_only) {
auto srv = conn->GetServer();
- auto lua = read_only ? conn->Owner()->Lua() : srv->Lua();
+ auto lua = conn->Owner()->Lua();
lua_getglobal(lua, "__redis__err__handler");
@@ -526,8 +526,8 @@ Status FunctionListFunc(Server *srv, const
redis::Connection *conn, const std::s
// list detailed informantion of a specific library
// NOTE: it is required to load the library to lua runtime before listing
(calling this function)
// i.e. it will output nothing if the library is only in storage but not loaded
-Status FunctionListLib(Server *srv, const redis::Connection *conn, const
std::string &libname, std::string *output) {
- auto lua = srv->Lua();
+Status FunctionListLib(redis::Connection *conn, const std::string &libname,
std::string *output) {
+ auto lua = conn->Owner()->Lua();
lua_getglobal(lua, REDIS_FUNCTION_LIBRARIES);
if (lua_isnil(lua, -1)) {
@@ -563,8 +563,8 @@ Status FunctionListLib(Server *srv, const redis::Connection
*conn, const std::st
return Status::OK();
}
-Status FunctionDelete(engine::Context &ctx, Server *srv, const std::string
&name) {
- auto lua = srv->Lua();
+Status FunctionDelete(engine::Context &ctx, redis::Connection *conn, const
std::string &name) {
+ auto lua = conn->Owner()->Lua();
lua_getglobal(lua, REDIS_FUNCTION_LIBRARIES);
if (lua_isnil(lua, -1)) {
@@ -578,7 +578,7 @@ Status FunctionDelete(engine::Context &ctx, Server *srv,
const std::string &name
return {Status::NotOK, "the library does not exist in lua environment"};
}
- auto storage = srv->storage;
+ auto storage = conn->GetServer()->storage;
auto cf = storage->GetCFHandle(ColumnFamilyID::Propagate);
for (size_t i = 1; i <= lua_objlen(lua, -1); ++i) {
@@ -607,7 +607,7 @@ Status EvalGenericCommand(redis::Connection *conn, const
std::string &body_or_sh
const std::vector<std::string> &argv, bool evalsha,
std::string *output, bool read_only) {
Server *srv = conn->GetServer();
// Use the worker's private Lua VM when entering the read-only mode
- lua_State *lua = read_only ? conn->Owner()->Lua() : srv->Lua();
+ lua_State *lua = conn->Owner()->Lua();
/* We obtain the script SHA1, then check if this function is already
* defined into the Lua state */
diff --git a/src/storage/scripting.h b/src/storage/scripting.h
index 188f855c..a7550a0e 100644
--- a/src/storage/scripting.h
+++ b/src/storage/scripting.h
@@ -75,8 +75,8 @@ Status FunctionCall(redis::Connection *conn, const
std::string &name, const std:
Status FunctionList(Server *srv, const redis::Connection *conn, const
std::string &libname, bool with_code,
std::string *output);
Status FunctionListFunc(Server *srv, const redis::Connection *conn, const
std::string &funcname, std::string *output);
-Status FunctionListLib(Server *srv, const redis::Connection *conn, const
std::string &libname, std::string *output);
-Status FunctionDelete(engine::Context &ctx, Server *srv, const std::string
&name);
+Status FunctionListLib(redis::Connection *conn, const std::string &libname,
std::string *output);
+Status FunctionDelete(engine::Context &ctx, redis::Connection *conn, const
std::string &name);
bool FunctionIsLibExist(redis::Connection *conn, const std::string &libname,
bool need_check_storage = true,
bool read_only = false);
diff --git a/tests/gocase/unit/scripting/scripting_test.go
b/tests/gocase/unit/scripting/scripting_test.go
index 1e97cad5..20aa0760 100644
--- a/tests/gocase/unit/scripting/scripting_test.go
+++ b/tests/gocase/unit/scripting/scripting_test.go
@@ -368,9 +368,11 @@ assert(bit.bor(1,2,4,8,16,32,64,128) == 255)
r1 := rdb.Eval(ctx, "return 1+1", []string{})
require.NoError(t, r1.Err())
require.Equal(t, int64(2), r1.Val())
- r2 := rdb.ScriptExists(ctx,
"a27e7e8a43702b7046d4f6a7ccf5b60cef6b9bd9",
"a27e7e8a43702b7046d4f6a7ccf5b60cef6b9bda")
+ r2 := rdb.ScriptLoad(ctx, "return 1+2")
require.NoError(t, r2.Err())
- require.Equal(t, []bool{true, false}, r2.Val())
+ r3 := rdb.ScriptExists(ctx,
"a27e7e8a43702b7046d4f6a7ccf5b60cef6b9bd9",
"a27e7e8a43702b7046d4f6a7ccf5b60cef6b9bda", r2.Val())
+ require.NoError(t, r3.Err())
+ require.Equal(t, []bool{false, false, true}, r3.Val())
})
t.Run("SCRIPT LOAD - should return SHA as the bulk string", func(t
*testing.T) {