This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new a68f07e5 Graceful shutdown the workers when reducing worker threads
(#1863)
a68f07e5 is described below
commit a68f07e50acef4c36c18caaf635a40e2cbb9319b
Author: hulk <[email protected]>
AuthorDate: Fri Nov 17 09:54:03 2023 +0800
Graceful shutdown the workers when reducing worker threads (#1863)
#1855 introduced the data race even only migrating the non-blocking
connections. For example:
T1: C0(connection) is running the command Config::Set on W0(worker)
and C1 is running another command on W1. C0 got the exclusive lock for
executing commands
and C1 will wait for the lock inside ExecuteCommands.
T2: C0 migrates C1 to W0 after reducing the number of worker threads, but
the ExecuteCommands function will
continue executing after migrating. So both W0 and W1 will access the C1 at
the same time.
To avoid this, I simply don't migrate the connection if it has any running
command and
delay to shutdown workers, so that old connections have a chance to finish
the running command.
---
src/commands/blocking_commander.h | 1 -
src/commands/cmd_pubsub.cc | 2 --
src/commands/cmd_stream.cc | 1 -
src/commands/commander.h | 1 -
src/server/redis_connection.cc | 19 ++++++++++++++++--
src/server/redis_connection.h | 5 ++++-
src/server/server.cc | 32 +++++++++++++++++++-----------
src/server/server.h | 2 +-
src/server/worker.cc | 35 +++++++++++++++++++++++----------
src/server/worker.h | 7 +++++--
tests/gocase/unit/config/config_test.go | 9 ++++-----
11 files changed, 77 insertions(+), 37 deletions(-)
diff --git a/src/commands/blocking_commander.h
b/src/commands/blocking_commander.h
index 3353f7d2..537e770f 100644
--- a/src/commands/blocking_commander.h
+++ b/src/commands/blocking_commander.h
@@ -44,7 +44,6 @@ class BlockingCommander : public Commander,
// in other words, returning true indicates ending the blocking
virtual bool OnBlockingWrite() = 0;
- bool IsBlocking() const override { return true; }
// to start the blocking process
// usually put to the end of the Execute method
Status StartBlocking(int64_t timeout, std::string *output) {
diff --git a/src/commands/cmd_pubsub.cc b/src/commands/cmd_pubsub.cc
index 9ec38bee..45272eef 100644
--- a/src/commands/cmd_pubsub.cc
+++ b/src/commands/cmd_pubsub.cc
@@ -82,7 +82,6 @@ void SubscribeCommandReply(std::string *output, const
std::string &name, const s
class CommandSubscribe : public Commander {
public:
- bool IsBlocking() const override { return true; }
Status Execute(Server *srv, Connection *conn, std::string *output) override {
for (unsigned i = 1; i < args_.size(); i++) {
conn->SubscribeChannel(args_[i]);
@@ -112,7 +111,6 @@ class CommandUnSubscribe : public Commander {
class CommandPSubscribe : public Commander {
public:
- bool IsBlocking() const override { return true; }
Status Execute(Server *srv, Connection *conn, std::string *output) override {
for (size_t i = 1; i < args_.size(); i++) {
conn->PSubscribeChannel(args_[i]);
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 0f8d6b96..545d438d 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -750,7 +750,6 @@ class CommandXRead : public Commander,
private EvbufCallbackBase<CommandXRead, false>,
private EventCallbackBase<CommandXRead> {
public:
- bool IsBlocking() const override { return true; }
Status Parse(const std::vector<std::string> &args) override {
size_t streams_word_idx = 0;
diff --git a/src/commands/commander.h b/src/commands/commander.h
index c37d1866..1cb4d221 100644
--- a/src/commands/commander.h
+++ b/src/commands/commander.h
@@ -70,7 +70,6 @@ class Commander {
void SetAttributes(const CommandAttributes *attributes) { attributes_ =
attributes; }
const CommandAttributes *GetAttributes() const { return attributes_; }
void SetArgs(const std::vector<std::string> &args) { args_ = args; }
- virtual bool IsBlocking() const { return false; }
virtual Status Parse() { return Parse(args_); }
virtual Status Parse(const std::vector<std::string> &args) { return
Status::OK(); }
virtual Status Execute(Server *srv, Connection *conn, std::string *output) {
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index 899260ff..a7dec01d 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -34,6 +34,7 @@
#include "commands/blocking_commander.h"
#include "redis_connection.h"
+#include "scope_exit.h"
#include "server.h"
#include "time_util.h"
#include "tls_util.h"
@@ -75,8 +76,9 @@ void Connection::Close() {
void Connection::Detach() { owner_->DetachConnection(this); }
-void Connection::OnRead(bufferevent *bev) {
- DLOG(INFO) << "[connection] on read: " << bufferevent_getfd(bev);
+void Connection::OnRead(struct bufferevent *bev) {
+ is_running_ = true;
+ MakeScopeExit([this] { is_running_ = false; });
SetLastInteraction();
auto s = req_.Tokenize(Input());
@@ -177,6 +179,13 @@ void Connection::DisableFlag(Flag flag) { flags_ &=
(~flag); }
bool Connection::IsFlagEnabled(Flag flag) const { return (flags_ & flag) > 0; }
+bool Connection::CanMigrate() const {
+ return !is_running_ //
reading or writing
+ && !IsFlagEnabled(redis::Connection::kCloseAfterReply) //
close after reply
+ && saved_current_command_ == nullptr //
not executing blocking command like BLPOP
+ && subscribe_channels_.empty() && subscribe_patterns_.empty(); //
not subscribing any channel
+}
+
void Connection::SubscribeChannel(const std::string &channel) {
for (const auto &chan : subscribe_channels_) {
if (channel == chan) return;
@@ -302,6 +311,7 @@ void Connection::ExecuteCommands(std::deque<CommandTokens>
*to_process_cmds) {
bool is_multi_exec = IsFlagEnabled(Connection::kMultiExec);
if (IsFlagEnabled(redis::Connection::kCloseAfterReply) && !is_multi_exec)
break;
+ std::unique_ptr<Commander> current_cmd;
auto s = srv_->LookupAndCreateCommand(cmd_tokens.front(), ¤t_cmd);
if (!s.IsOK()) {
if (is_multi_exec) multi_error_ = true;
@@ -424,6 +434,11 @@ void Connection::ExecuteCommands(std::deque<CommandTokens>
*to_process_cmds) {
// Break the execution loop when occurring the blocking command like BLPOP
or BRPOP,
// it will suspend the connection and wait for the wakeup signal.
if (s.Is<Status::BlockingCmd>()) {
+ // For the blocking command, it will use the command while resumed from
the suspend state.
+ // So we need to save the command for the next execution.
+ // Migrate connection would also check the saved_current_command_ to
determine whether
+ // the connection can be migrated or not.
+ saved_current_command_ = std::move(current_cmd);
break;
}
diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h
index 0274bc33..e38a70fa 100644
--- a/src/server/redis_connection.h
+++ b/src/server/redis_connection.h
@@ -119,6 +119,7 @@ class Connection : public EvbufCallbackBase<Connection> {
void RecordProfilingSampleIfNeed(const std::string &cmd, uint64_t duration);
void SetImporting() { importing_ = true; }
bool IsImporting() const { return importing_; }
+ bool CanMigrate() const;
// Multi exec
void SetInExec() { in_exec_ = true; }
@@ -127,7 +128,6 @@ class Connection : public EvbufCallbackBase<Connection> {
void ResetMultiExec();
std::deque<redis::CommandTokens> *GetMultiExecCommands() { return
&multi_cmds_; }
- std::unique_ptr<Commander> current_cmd;
std::function<void(int)> close_cb = nullptr;
std::set<std::string> watched_keys;
@@ -152,12 +152,15 @@ class Connection : public EvbufCallbackBase<Connection> {
bufferevent *bev_;
Request req_;
Worker *owner_;
+ std::unique_ptr<Commander> saved_current_command_;
+
std::vector<std::string> subscribe_channels_;
std::vector<std::string> subscribe_patterns_;
Server *srv_;
bool in_exec_ = false;
bool multi_error_ = false;
+ std::atomic<bool> is_running_ = false;
std::deque<redis::CommandTokens> multi_cmds_;
bool importing_ = false;
diff --git a/src/server/server.cc b/src/server/server.cc
index 03f2eae7..41da1735 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -115,7 +115,7 @@ Server::~Server() {
for (auto &worker_thread : worker_threads_) {
worker_thread.reset();
}
- cleanupExitedWorkerThreads();
+ cleanupExitedWorkerThreads(true /* force */);
CleanupExitedSlaves();
lua::DestroyState(lua_);
@@ -226,7 +226,7 @@ void Server::Stop() {
slaveof_mu_.unlock();
for (const auto &worker : worker_threads_) {
- worker->Stop();
+ worker->Stop(0 /* immediately terminate */);
}
rocksdb::CancelAllBackgroundWork(storage->GetDB(), true);
@@ -739,8 +739,9 @@ void Server::cron() {
storage->SetDBInRetryableIOError(false);
}
- if (counter != 0 && counter % 10 == 0) {
- cleanupExitedWorkerThreads();
+ // check if we need to clean up exited worker threads every 5s
+ if (counter != 0 && counter % 50 == 0) {
+ cleanupExitedWorkerThreads(false);
}
CleanupExitedSlaves();
@@ -1685,12 +1686,12 @@ void Server::AdjustWorkerThreads() {
if (new_worker_threads > worker_threads_.size()) {
delta = new_worker_threads - worker_threads_.size();
increaseWorkerThreads(delta);
- LOG(INFO) << "[server] Increase worker threads to " << new_worker_threads;
+ LOG(INFO) << "[server] Increase worker threads from " <<
worker_threads_.size() << " to " << new_worker_threads;
return;
}
delta = worker_threads_.size() - new_worker_threads;
- LOG(INFO) << "[server] Decrease worker threads to " << new_worker_threads;
+ LOG(INFO) << "[server] Decrease worker threads from " <<
worker_threads_.size() << " to " << new_worker_threads;
decreaseWorkerThreads(delta);
}
@@ -1721,17 +1722,26 @@ void Server::decreaseWorkerThreads(size_t delta) {
auto target_worker = worker_threads_[iter.first %
remain_worker_threads]->GetWorker();
worker_thread->GetWorker()->MigrateConnection(target_worker,
iter.second);
}
- worker_thread->Stop();
+ worker_thread->Stop(10 /* graceful timeout */);
// Don't join the worker thread here, because it may join itself.
recycle_worker_threads_.push(std::move(worker_thread));
}
}
-void Server::cleanupExitedWorkerThreads() {
+void Server::cleanupExitedWorkerThreads(bool force) {
std::unique_ptr<WorkerThread> worker_thread = nullptr;
- while (recycle_worker_threads_.try_pop(worker_thread)) {
- worker_thread->Join();
- worker_thread.reset();
+ auto total = recycle_worker_threads_.unsafe_size();
+ for (size_t i = 0; i < total; i++) {
+ if (!recycle_worker_threads_.try_pop(worker_thread)) {
+ break;
+ }
+ if (worker_thread->IsTerminated() || force) {
+ worker_thread->Join();
+ worker_thread.reset();
+ } else {
+ // Push the worker thread back to the queue if it's still running.
+ recycle_worker_threads_.push(std::move(worker_thread));
+ }
}
}
diff --git a/src/server/server.h b/src/server/server.h
index bdbaacad..2acd0f5d 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -305,7 +305,7 @@ class Server {
void updateAllWatchedKeys();
void increaseWorkerThreads(size_t delta);
void decreaseWorkerThreads(size_t delta);
- void cleanupExitedWorkerThreads();
+ void cleanupExitedWorkerThreads(bool force);
std::atomic<bool> stop_ = false;
std::atomic<bool> is_loading_ = false;
diff --git a/src/server/worker.cc b/src/server/worker.cc
index efc4ddc6..47042d03 100644
--- a/src/server/worker.cc
+++ b/src/server/worker.cc
@@ -252,7 +252,8 @@ Status Worker::listenTCP(const std::string &host, uint32_t
port, int backlog) {
}
evutil_make_socket_nonblocking(fd);
- auto lev = NewEvconnlistener<&Worker::newTCPConnection>(base_,
LEV_OPT_CLOSE_ON_FREE, backlog, fd);
+ auto lev =
+ NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_THREADSAFE
| LEV_OPT_CLOSE_ON_FREE, backlog, fd);
listen_events_.emplace_back(lev);
}
@@ -292,14 +293,22 @@ void Worker::Run(std::thread::id tid) {
if (event_base_dispatch(base_) != 0) {
LOG(ERROR) << "[worker] Failed to run server, err: " << strerror(errno);
}
+ is_terminated_ = true;
}
-void Worker::Stop() {
- event_base_loopbreak(base_);
+void Worker::Stop(uint32_t wait_seconds) {
for (const auto &lev : listen_events_) {
// It's unnecessary to close the listener fd since we have set the
LEV_OPT_CLOSE_ON_FREE flag
evconnlistener_free(lev);
}
+ // wait_seconds == 0 means stop immediately, or it will wait N seconds
+ // for the worker to process the remaining requests before stopping.
+ if (wait_seconds > 0) {
+ timeval tv = {wait_seconds, 0};
+ event_base_loopexit(base_, &tv);
+ } else {
+ event_base_loopbreak(base_);
+ }
}
Status Worker::AddConnection(redis::Connection *c) {
@@ -351,18 +360,24 @@ redis::Connection *Worker::removeConnection(int fd) {
// blocked on a key or stream.
void Worker::MigrateConnection(Worker *target, redis::Connection *conn) {
if (!target || !conn) return;
- if (conn->current_cmd != nullptr && conn->current_cmd->IsBlocking()) {
- // don't need to close the connection since destroy worker thread will
close it
+
+ auto bev = conn->GetBufferEvent();
+ // disable read/write event to prevent the connection from being processed
during migration
+ bufferevent_disable(bev, EV_READ | EV_WRITE);
+ // We cannot migrate the connection if it has a running command
+ // since it will cause data race since the old worker may still process the
command.
+ if (!conn->CanMigrate()) {
+ // Need to enable read/write event again since we disabled them before
+ bufferevent_enable(bev, EV_READ | EV_WRITE);
return;
}
+ // remove the connection from current worker
+ DetachConnection(conn);
if (!target->AddConnection(conn).IsOK()) {
- // destroy worker thread will close the connection
+ conn->Close();
return;
}
- // remove the connection from current worker
- DetachConnection(conn);
- auto bev = conn->GetBufferEvent();
bufferevent_base_set(target->base_, bev);
conn->SetCB(bev);
bufferevent_enable(bev, EV_READ | EV_WRITE);
@@ -540,7 +555,7 @@ void WorkerThread::Start() {
LOG(INFO) << "[worker] Thread #" << t_.get_id() << " started";
}
-void WorkerThread::Stop() { worker_->Stop(); }
+void WorkerThread::Stop(uint32_t wait_seconds) { worker_->Stop(wait_seconds); }
void WorkerThread::Join() {
if (auto s = util::ThreadJoin(t_); !s) {
diff --git a/src/server/worker.h b/src/server/worker.h
index 96a6c7cd..a9f618e5 100644
--- a/src/server/worker.h
+++ b/src/server/worker.h
@@ -50,8 +50,9 @@ class Worker : EventCallbackBase<Worker>,
EvconnlistenerBase<Worker> {
Worker(Worker &&) = delete;
Worker &operator=(const Worker &) = delete;
- void Stop();
+ void Stop(uint32_t wait_seconds);
void Run(std::thread::id tid);
+ bool IsTerminated() const { return is_terminated_; }
void MigrateConnection(Worker *target, redis::Connection *conn);
void DetachConnection(redis::Connection *conn);
@@ -94,6 +95,7 @@ class Worker : EventCallbackBase<Worker>,
EvconnlistenerBase<Worker> {
struct bufferevent_rate_limit_group *rate_limit_group_ = nullptr;
struct ev_token_bucket_cfg *rate_limit_group_cfg_ = nullptr;
lua_State *lua_;
+ std::atomic<bool> is_terminated_ = false;
};
class WorkerThread {
@@ -106,8 +108,9 @@ class WorkerThread {
Worker *GetWorker() { return worker_.get(); }
void Start();
- void Stop();
+ void Stop(uint32_t wait_seconds);
void Join();
+ bool IsTerminated() const { return worker_->IsTerminated(); }
private:
std::thread t_;
diff --git a/tests/gocase/unit/config/config_test.go
b/tests/gocase/unit/config/config_test.go
index 20dcc612..dd880367 100644
--- a/tests/gocase/unit/config/config_test.go
+++ b/tests/gocase/unit/config/config_test.go
@@ -151,10 +151,7 @@ func TestDynamicChangeWorkerThread(t *testing.T) {
defer srv.Close()
ctx := context.Background()
- rdb := srv.NewClientWithOption(&redis.Options{
- MaxIdleConns: 20,
- MaxRetries: -1, // Disable retry to check connections are
alive after config change
- })
+ rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()
t.Run("Test dynamic change worker thread", func(t *testing.T) {
@@ -217,12 +214,14 @@ func TestDynamicChangeWorkerThread(t *testing.T) {
go func() {
defer wg.Done()
_ = rdb.XRead(ctx, &redis.XReadArgs{
- Streams: []string{"s1", "s2", "s3"},
+ Streams: []string{"s1", "$"},
Count: 1,
Block: blockingTimeout,
})
}()
+ // sleep a while to make sure all blocking requests are ready
+ time.Sleep(time.Second)
require.NoError(t, rdb.Do(ctx, "CONFIG", "SET", "workers",
"1").Err())
wg.Wait()