This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new e445f83 Fix use after free and some memory leaks which reported by
ASan (#714)
e445f83 is described below
commit e445f8365a8e4798f19b757791f035be232de51d
Author: hulk <[email protected]>
AuthorDate: Wed Jul 20 15:56:10 2022 +0800
Fix use after free and some memory leaks which reported by ASan (#714)
---
src/main.cc | 1 +
src/redis_cmd.cc | 5 +++--
src/redis_connection.cc | 16 +++++++++++-----
src/redis_connection.h | 7 ++++---
src/redis_request.h | 6 +++---
src/scripting.cc | 1 +
src/server.cc | 13 +++++++++++++
src/status.h | 4 ++++
src/worker.cc | 3 +++
9 files changed, 43 insertions(+), 13 deletions(-)
diff --git a/src/main.cc b/src/main.cc
index 8972a2d..cabd0c8 100644
--- a/src/main.cc
+++ b/src/main.cc
@@ -358,6 +358,7 @@ int main(int argc, char* argv[]) {
}
srv->Join();
+ delete srv;
removePidFile(config.pidfile);
google::ShutdownGoogleLogging();
libevent_global_shutdown();
diff --git a/src/redis_cmd.cc b/src/redis_cmd.cc
index f92613d..62ee102 100644
--- a/src/redis_cmd.cc
+++ b/src/redis_cmd.cc
@@ -1577,7 +1577,7 @@ class CommandBPop : public Commander {
timeval tm = {timeout_, 0};
evtimer_add(timer_, &tm);
}
- return Status::OK();
+ return Status(Status::BlockingCmd);
}
rocksdb::Status TryPopFromList() {
@@ -3362,7 +3362,7 @@ class CommandExec : public Commander {
}
// Reply multi length first
- conn->Reply(Redis::MultiLen(conn->GetMultiExecCommands().size()));
+ conn->Reply(Redis::MultiLen(conn->GetMultiExecCommands()->size()));
// Execute multi-exec commands
conn->SetInExec();
conn->ExecuteCommands(conn->GetMultiExecCommands());
@@ -4401,6 +4401,7 @@ class CommandFetchFile : public Commander {
svr->IncrFetchFileThread();
for (auto file : files) {
+ if (svr->IsStopped()) break;
uint64_t file_size = 0, max_replication_bytes = 0;
if (svr->GetConfig()->max_replication_mb > 0) {
max_replication_bytes = (svr->GetConfig()->max_replication_mb*MiB) /
diff --git a/src/redis_connection.cc b/src/redis_connection.cc
index 2c6000f..67a81d4 100644
--- a/src/redis_connection.cc
+++ b/src/redis_connection.cc
@@ -87,7 +87,6 @@ void Connection::OnRead(struct bufferevent *bev, void *ctx) {
return;
}
conn->ExecuteCommands(conn->req_.GetCommands());
- conn->req_.ClearCommands();
if (conn->IsFlagEnabled(kCloseAsync)) {
conn->Close();
}
@@ -307,13 +306,14 @@ void Connection::recordProfilingSampleIfNeed(const
std::string &cmd, uint64_t du
svr_->GetPerfLog()->PushEntry(entry);
}
-void Connection::ExecuteCommands(const std::vector<Redis::CommandTokens>
&to_process_cmds) {
- if (to_process_cmds.empty()) return;
-
+void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
Config *config = svr_->GetConfig();
std::string reply, password = config->requirepass;
- for (auto &cmd_tokens : to_process_cmds) {
+ while (!to_process_cmds->empty()) {
+ auto cmd_tokens = to_process_cmds->front();
+ to_process_cmds->pop_front();
+
if (IsFlagEnabled(Redis::Connection::kCloseAfterReply) &&
!IsFlagEnabled(Connection::kMultiExec)) break;
if (GetNamespace().empty()) {
@@ -426,6 +426,12 @@ void Connection::ExecuteCommands(const
std::vector<Redis::CommandTokens> &to_pro
svr_->SlowlogPushEntryIfNeeded(current_cmd_->Args(), duration);
svr_->stats_.IncrLatency(static_cast<uint64_t>(duration), cmd_name);
svr_->FeedMonitorConns(this, cmd_tokens);
+
+ // Break the execution loop when occurring the blocking command like BLPOP
or BRPOP,
+ // it will suspend the connection and wait for the wakeup signal.
+ if (s.IsBlockingCommand()) {
+ break;
+ }
// Reply for MULTI
if (!s.IsOK()) {
Reply(Redis::Error("ERR " + s.Msg()));
diff --git a/src/redis_connection.h b/src/redis_connection.h
index 90b64e5..c4c2d3f 100644
--- a/src/redis_connection.h
+++ b/src/redis_connection.h
@@ -21,6 +21,7 @@
#pragma once
#include <vector>
+#include <deque>
#include <string>
#include <utility>
#include <memory>
@@ -102,7 +103,7 @@ class Connection {
evbuffer *Input() { return bufferevent_get_input(bev_); }
evbuffer *Output() { return bufferevent_get_output(bev_); }
bufferevent *GetBufferEvent() { return bev_; }
- void ExecuteCommands(const std::vector<Redis::CommandTokens>
&to_process_cmds);
+ void ExecuteCommands(std::deque<CommandTokens> *to_process_cmds);
bool isProfilingEnabled(const std::string &cmd);
void recordProfilingSampleIfNeed(const std::string &cmd, uint64_t duration);
void SetImporting() { importing_ = true; }
@@ -113,7 +114,7 @@ class Connection {
bool IsInExec() { return in_exec_; }
bool IsMultiError() { return multi_error_; }
void ResetMultiExec();
- const std::vector<Redis::CommandTokens> &GetMultiExecCommands() { return
multi_cmds_; }
+ std::deque<Redis::CommandTokens> *GetMultiExecCommands() { return
&multi_cmds_; }
std::unique_ptr<Commander> current_cmd_;
std::function<void(int)> close_cb_ = nullptr;
@@ -142,7 +143,7 @@ class Connection {
Server *svr_;
bool in_exec_ = false;
bool multi_error_ = false;
- std::vector<Redis::CommandTokens> multi_cmds_;
+ std::deque<Redis::CommandTokens> multi_cmds_;
bool importing_ = false;
};
diff --git a/src/redis_request.h b/src/redis_request.h
index c7f3e1c..b8d658e 100644
--- a/src/redis_request.h
+++ b/src/redis_request.h
@@ -20,6 +20,7 @@
#pragma once
+#include <deque>
#include <vector>
#include <string>
#include <event2/buffer.h>
@@ -44,8 +45,7 @@ class Request {
// Parse the redis requests (bulk string array format)
Status Tokenize(evbuffer *input);
- const std::vector<CommandTokens> &GetCommands() { return commands_; }
- void ClearCommands() { commands_.clear(); }
+ std::deque<CommandTokens> *GetCommands() { return &commands_; }
private:
// internal states related to parsing
@@ -55,7 +55,7 @@ class Request {
int64_t multi_bulk_len_ = 0;
size_t bulk_len_ = 0;
CommandTokens tokens_;
- std::vector<CommandTokens> commands_;
+ std::deque<CommandTokens> commands_;
Server *svr_;
};
diff --git a/src/scripting.cc b/src/scripting.cc
index 675e829..caaa8e7 100644
--- a/src/scripting.cc
+++ b/src/scripting.cc
@@ -90,6 +90,7 @@ namespace Lua {
}
void DestroyState(lua_State *lua) {
+ lua_gc(lua, LUA_GCCOLLECT, 0);
lua_close(lua);
}
diff --git a/src/server.cc b/src/server.cc
index a837c4a..b48d41b 100644
--- a/src/server.cc
+++ b/src/server.cc
@@ -81,6 +81,19 @@ Server::~Server() {
for (const auto &iter : conn_ctxs_) {
delete iter.first;
}
+
+ // Wait for all fetch file threads stop and exit and force destroy
+ // the server after 60s.
+ int counter = 0;
+ while (GetFetchFileThreadNum() != 0) {
+ usleep(100000);
+ if (++counter == 600) {
+ LOG(WARNING) << "Will force destroy the server after waiting 60s, leave
" << GetFetchFileThreadNum()
+ << " fetch file threads are still running";
+ break;
+ }
+ }
+ Lua::DestroyState(lua_);
}
// Kvrocks threads list:
diff --git a/src/status.h b/src/status.h
index 8727f1c..21ae3c4 100644
--- a/src/status.h
+++ b/src/status.h
@@ -55,6 +55,9 @@ class Status {
// Network
NetSendErr,
+
+ // Blocking
+ BlockingCmd,
};
Status() : Status(cOK) {}
@@ -62,6 +65,7 @@ class Status {
bool IsOK() { return code_ == cOK; }
bool IsNotFound() { return code_ == NotFound; }
bool IsImorting() { return code_ == SlotImport; }
+ bool IsBlockingCommand() { return code_ == BlockingCmd; }
std::string Msg() {
if (IsOK()) {
return "ok";
diff --git a/src/worker.cc b/src/worker.cc
index d0bb412..5a64167 100644
--- a/src/worker.cc
+++ b/src/worker.cc
@@ -62,6 +62,9 @@ Worker::~Worker() {
for (const auto &iter : conns_) {
conns.emplace_back(iter.second);
}
+ for (const auto &iter : monitor_conns_) {
+ conns.emplace_back(iter.second);
+ }
for (const auto &iter : conns) {
iter->Close();
}