This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new b29ad21d Allow to change the worker thread number in-flight (#1855)
b29ad21d is described below

commit b29ad21d86959a22b577debf3ca233c31dc2d775
Author: hulk <[email protected]>
AuthorDate: Mon Oct 30 09:13:54 2023 +0800

    Allow to change the worker thread number in-flight (#1855)
    
    Scaling up is as simple as starting new threads and adding them to the 
worker pool.
    But scaling down is a bit complex since we need to migrate existing 
connections from
    the to-be-removed workers to survived workers. This process is NOT so easy 
for the sake
    of the blocking commands that will store its context(includes fd and owner 
pointer) in
    different global variables and we need to find out and remove them. As far 
as I know,
    it contains the below commands:
    
    - All LIST blocking command
    - Subscribe/PSubscribe
    - XREAD
    
    To make it simple, we would like to close those connections if the last 
command is
    a blocking command. And we can improve this if any users complain about 
this.
---
 src/commands/blocking_commander.h       |   1 +
 src/commands/cmd_pubsub.cc              |   2 +
 src/commands/cmd_stream.cc              |   1 +
 src/commands/commander.h                |   1 +
 src/config/config.cc                    | 577 ++++++++++++++++----------------
 src/server/redis_connection.cc          |   1 +
 src/server/redis_connection.h           |   1 +
 src/server/server.cc                    |  83 ++++-
 src/server/server.h                     |   7 +-
 src/server/worker.cc                    |  25 ++
 src/server/worker.h                     |   2 +
 tests/cppunit/config_test.cc            |   2 +-
 tests/gocase/unit/config/config_test.go |  92 +++++
 13 files changed, 494 insertions(+), 301 deletions(-)

diff --git a/src/commands/blocking_commander.h 
b/src/commands/blocking_commander.h
index 537e770f..3353f7d2 100644
--- a/src/commands/blocking_commander.h
+++ b/src/commands/blocking_commander.h
@@ -44,6 +44,7 @@ class BlockingCommander : public Commander,
   // in other words, returning true indicates ending the blocking
   virtual bool OnBlockingWrite() = 0;
 
+  bool IsBlocking() const override { return true; }
   // to start the blocking process
   // usually put to the end of the Execute method
   Status StartBlocking(int64_t timeout, std::string *output) {
diff --git a/src/commands/cmd_pubsub.cc b/src/commands/cmd_pubsub.cc
index 352930a0..fb8c5fbc 100644
--- a/src/commands/cmd_pubsub.cc
+++ b/src/commands/cmd_pubsub.cc
@@ -82,6 +82,7 @@ void SubscribeCommandReply(std::string *output, const 
std::string &name, const s
 
 class CommandSubscribe : public Commander {
  public:
+  bool IsBlocking() const override { return true; }
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
     for (unsigned i = 1; i < args_.size(); i++) {
       conn->SubscribeChannel(args_[i]);
@@ -111,6 +112,7 @@ class CommandUnSubscribe : public Commander {
 
 class CommandPSubscribe : public Commander {
  public:
+  bool IsBlocking() const override { return true; }
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
     for (size_t i = 1; i < args_.size(); i++) {
       conn->PSubscribeChannel(args_[i]);
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 57b21513..e8607d1c 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -664,6 +664,7 @@ class CommandXRead : public Commander,
                      private EvbufCallbackBase<CommandXRead, false>,
                      private EventCallbackBase<CommandXRead> {
  public:
+  bool IsBlocking() const override { return true; }
   Status Parse(const std::vector<std::string> &args) override {
     size_t streams_word_idx = 0;
 
diff --git a/src/commands/commander.h b/src/commands/commander.h
index 758cf993..48bdeafa 100644
--- a/src/commands/commander.h
+++ b/src/commands/commander.h
@@ -70,6 +70,7 @@ class Commander {
   void SetAttributes(const CommandAttributes *attributes) { attributes_ = 
attributes; }
   const CommandAttributes *GetAttributes() const { return attributes_; }
   void SetArgs(const std::vector<std::string> &args) { args_ = args; }
+  virtual bool IsBlocking() const { return false; }
   virtual Status Parse() { return Parse(args_); }
   virtual Status Parse(const std::vector<std::string> &args) { return 
Status::OK(); }
   virtual Status Execute(Server *svr, Connection *conn, std::string *output) {
diff --git a/src/config/config.cc b/src/config/config.cc
index 3313eaba..adacc345 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -105,7 +105,7 @@ Config::Config() {
       {"tls-session-cache-timeout", false, new 
IntField(&tls_session_cache_timeout, 300, 0, INT_MAX)},
       {"tls-replication", true, new YesNoField(&tls_replication, false)},
 #endif
-      {"workers", true, new IntField(&workers, 8, 1, 256)},
+      {"workers", false, new IntField(&workers, 8, 1, 256)},
       {"timeout", false, new IntField(&timeout, 0, 0, INT_MAX)},
       {"tcp-backlog", true, new IntField(&backlog, 511, 0, INT_MAX)},
       {"maxclients", false, new IntField(&maxclients, 10240, 0, INT_MAX)},
@@ -359,296 +359,303 @@ void Config::initFieldCallback() {
   };
 #endif
 
-  std::map<std::string, CallbackFn> callbacks = {
-      {"dir",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         db_dir = dir + "/db";
-         {
-           std::lock_guard<std::mutex> lg(this->backup_mu);
-           if (backup_dir.empty()) {
-             backup_dir = dir + "/backup";
-           }
-         }
-         if (log_dir.empty()) log_dir = dir;
-         checkpoint_dir = dir + "/checkpoint";
-         sync_checkpoint_dir = dir + "/sync_checkpoint";
-         backup_sync_dir = dir + "/backup_for_sync";
-         return Status::OK();
-       }},
-      {"backup-dir",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         std::string previous_backup;
-         {
-           // Note: currently, backup_mu_ may block by backing up or purging,
-           //  the command may wait for seconds.
-           std::lock_guard<std::mutex> lg(this->backup_mu);
-           previous_backup = std::move(backup_dir);
-           backup_dir = v;
-         }
-         if (!previous_backup.empty() && srv != nullptr && !srv->IsLoading()) {
-           // LOG(INFO) should be called after log is initialized and server 
is loaded.
-           LOG(INFO) << "change backup dir from " << previous_backup << " to " 
<< v;
-         }
-         return Status::OK();
-       }},
-      {"cluster-enabled",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         if (cluster_enabled) slot_id_encoded = true;
-         return Status::OK();
-       }},
-      {"bind",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         std::vector<std::string> args = util::Split(v, " \t");
-         binds = std::move(args);
-         return Status::OK();
-       }},
-      {"maxclients",
-       [](Server *srv, const std::string &k, const std::string &v) -> Status {
-         if (!srv) return Status::OK();
-         srv->AdjustOpenFilesLimit();
-         return Status::OK();
-       }},
-      {"slaveof",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         if (v.empty()) {
-           return Status::OK();
-         }
-         std::vector<std::string> args = util::Split(v, " \t");
-         if (args.size() != 2) return {Status::NotOK, "wrong number of 
arguments"};
-         if (args[0] != "no" && args[1] != "one") {
-           master_host = args[0];
-           auto parse_result = ParseInt<int>(args[1], NumericRange<int>{1, 
PORT_LIMIT - 1}, 10);
-           if (!parse_result) {
-             return {Status::NotOK, "should be between 0 and 65535"};
-           }
-           master_port = *parse_result;
-         }
-         return Status::OK();
-       }},
-      {"profiling-sample-commands",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         std::vector<std::string> cmds = util::Split(v, ",");
-         profiling_sample_all_commands = false;
-         profiling_sample_commands.clear();
-         for (auto const &cmd : cmds) {
-           if (cmd == "*") {
-             profiling_sample_all_commands = true;
+  std::map<std::string, CallbackFn> callbacks =
+      {
+          {"workers",
+           [](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             srv->AdjustWorkerThreads();
+             return Status::OK();
+           }},
+          {"dir",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             db_dir = dir + "/db";
+             {
+               std::lock_guard<std::mutex> lg(this->backup_mu);
+               if (backup_dir.empty()) {
+                 backup_dir = dir + "/backup";
+               }
+             }
+             if (log_dir.empty()) log_dir = dir;
+             checkpoint_dir = dir + "/checkpoint";
+             sync_checkpoint_dir = dir + "/sync_checkpoint";
+             backup_sync_dir = dir + "/backup_for_sync";
+             return Status::OK();
+           }},
+          {"backup-dir",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             std::string previous_backup;
+             {
+               // Note: currently, backup_mu_ may block by backing up or 
purging,
+               //  the command may wait for seconds.
+               std::lock_guard<std::mutex> lg(this->backup_mu);
+               previous_backup = std::move(backup_dir);
+               backup_dir = v;
+             }
+             if (!previous_backup.empty() && srv != nullptr && 
!srv->IsLoading()) {
+               // LOG(INFO) should be called after log is initialized and 
server is loaded.
+               LOG(INFO) << "change backup dir from " << previous_backup << " 
to " << v;
+             }
+             return Status::OK();
+           }},
+          {"cluster-enabled",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (cluster_enabled) slot_id_encoded = true;
+             return Status::OK();
+           }},
+          {"bind",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             std::vector<std::string> args = util::Split(v, " \t");
+             binds = std::move(args);
+             return Status::OK();
+           }},
+          {"maxclients",
+           [](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             srv->AdjustOpenFilesLimit();
+             return Status::OK();
+           }},
+          {"slaveof",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (v.empty()) {
+               return Status::OK();
+             }
+             std::vector<std::string> args = util::Split(v, " \t");
+             if (args.size() != 2) return {Status::NotOK, "wrong number of 
arguments"};
+             if (args[0] != "no" && args[1] != "one") {
+               master_host = args[0];
+               auto parse_result = ParseInt<int>(args[1], NumericRange<int>{1, 
PORT_LIMIT - 1}, 10);
+               if (!parse_result) {
+                 return {Status::NotOK, "should be between 0 and 65535"};
+               }
+               master_port = *parse_result;
+             }
+             return Status::OK();
+           }},
+          {"profiling-sample-commands",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             std::vector<std::string> cmds = util::Split(v, ",");
+             profiling_sample_all_commands = false;
              profiling_sample_commands.clear();
+             for (auto const &cmd : cmds) {
+               if (cmd == "*") {
+                 profiling_sample_all_commands = true;
+                 profiling_sample_commands.clear();
+                 return Status::OK();
+               }
+               if (!redis::CommandTable::IsExists(cmd)) {
+                 return {Status::NotOK, cmd + " is not Kvrocks supported 
command"};
+               }
+               // profiling_sample_commands use command's original name, 
regardless of rename-command directive
+               profiling_sample_commands.insert(cmd);
+             }
              return Status::OK();
-           }
-           if (!redis::CommandTable::IsExists(cmd)) {
-             return {Status::NotOK, cmd + " is not Kvrocks supported command"};
-           }
-           // profiling_sample_commands use command's original name, 
regardless of rename-command directive
-           profiling_sample_commands.insert(cmd);
-         }
-         return Status::OK();
-       }},
-      {"slowlog-max-len",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         if (!srv) return Status::OK();
-         srv->GetSlowLog()->SetMaxEntries(slowlog_max_len);
-         return Status::OK();
-       }},
-      {"max-db-size",
-       [](Server *srv, const std::string &k, const std::string &v) -> Status {
-         if (!srv) return Status::OK();
-         srv->storage->CheckDBSizeLimit();
-         return Status::OK();
-       }},
-      {"max-io-mb",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         if (!srv) return Status::OK();
-         srv->storage->SetIORateLimit(max_io_mb);
-         return Status::OK();
-       }},
-      {"profiling-sample-record-max-len",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         if (!srv) return Status::OK();
-         srv->GetPerfLog()->SetMaxEntries(profiling_sample_record_max_len);
-         return Status::OK();
-       }},
-      {"migrate-speed",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         if (!srv) return Status::OK();
-         if (cluster_enabled) 
srv->slot_migrator->SetMaxMigrationSpeed(migrate_speed);
-         return Status::OK();
-       }},
-      {"migrate-pipeline-size",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         if (!srv) return Status::OK();
-         if (cluster_enabled) 
srv->slot_migrator->SetMaxPipelineSize(pipeline_size);
-         return Status::OK();
-       }},
-      {"migrate-sequence-gap",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         if (!srv) return Status::OK();
-         if (cluster_enabled) 
srv->slot_migrator->SetSequenceGapLimit(sequence_gap);
-         return Status::OK();
-       }},
-      {"log-level",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         if (!srv) return Status::OK();
-         FLAGS_minloglevel = log_level;
-         return Status::OK();
-       }},
-      {"log-retention-days",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         if (!srv) return Status::OK();
-         if (util::ToLower(log_dir) == "stdout") {
-           return {Status::NotOK, "can't set the 'log-retention-days' when the 
log dir is stdout"};
-         }
-
-         if (log_retention_days != -1) {
-           google::EnableLogCleaner(log_retention_days);
-         } else {
-           google::DisableLogCleaner();
-         }
-         return Status::OK();
-       }},
-      {"persist-cluster-nodes-enabled",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         if (!srv || !cluster_enabled) return Status::OK();
-         auto nodes_file_path = NodesFilePath();
-         if (v == "yes") {
-           return srv->cluster->DumpClusterNodes(nodes_file_path);
-         }
-         // Remove the cluster nodes file to avoid stale cluster nodes info
-         remove(nodes_file_path.data());
-         return Status::OK();
-       }},
-      {"repl-namespace-enabled",
-       [](Server *srv, const std::string &k, const std::string &v) -> Status {
-         if (!srv) return Status::OK();
-         return srv->GetNamespace()->LoadAndRewrite();
-       }},
+           }},
+          {"slowlog-max-len",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             srv->GetSlowLog()->SetMaxEntries(slowlog_max_len);
+             return Status::OK();
+           }},
+          {"max-db-size",
+           [](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             srv->storage->CheckDBSizeLimit();
+             return Status::OK();
+           }},
+          {"max-io-mb",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             srv->storage->SetIORateLimit(max_io_mb);
+             return Status::OK();
+           }},
+          {"profiling-sample-record-max-len",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             srv->GetPerfLog()->SetMaxEntries(profiling_sample_record_max_len);
+             return Status::OK();
+           }},
+          {"migrate-speed",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             if (cluster_enabled) 
srv->slot_migrator->SetMaxMigrationSpeed(migrate_speed);
+             return Status::OK();
+           }},
+          {"migrate-pipeline-size",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             if (cluster_enabled) 
srv->slot_migrator->SetMaxPipelineSize(pipeline_size);
+             return Status::OK();
+           }},
+          {"migrate-sequence-gap",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             if (cluster_enabled) 
srv->slot_migrator->SetSequenceGapLimit(sequence_gap);
+             return Status::OK();
+           }},
+          {"log-level",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             FLAGS_minloglevel = log_level;
+             return Status::OK();
+           }},
+          {"log-retention-days",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             if (util::ToLower(log_dir) == "stdout") {
+               return {Status::NotOK, "can't set the 'log-retention-days' when 
the log dir is stdout"};
+             }
 
-      {"rocksdb.target_file_size_base",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         if (!srv) return Status::OK();
-         return 
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
-                                                            
std::to_string(rocks_db.target_file_size_base * MiB));
-       }},
-      {"rocksdb.write_buffer_size",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         if (!srv) return Status::OK();
-         return 
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
-                                                            
std::to_string(rocks_db.write_buffer_size * MiB));
-       }},
-      {"rocksdb.disable_auto_compactions",
-       [](Server *srv, const std::string &k, const std::string &v) -> Status {
-         if (!srv) return Status::OK();
-         std::string disable_auto_compactions = v == "yes" ? "true" : "false";
-         return 
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), 
disable_auto_compactions);
-       }},
-      {"rocksdb.max_total_wal_size",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         if (!srv) return Status::OK();
-         return srv->storage->SetDBOption(TrimRocksDbPrefix(k), 
std::to_string(rocks_db.max_total_wal_size * MiB));
-       }},
-      {"rocksdb.enable_blob_files",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         if (!srv) return Status::OK();
-         std::string enable_blob_files = rocks_db.enable_blob_files ? "true" : 
"false";
-         return 
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), 
enable_blob_files);
-       }},
-      {"rocksdb.min_blob_size",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         if (!srv) return Status::OK();
-         if (!rocks_db.enable_blob_files) {
-           return {Status::NotOK, errBlobDbNotEnabled};
-         }
-         return 
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), v);
-       }},
-      {"rocksdb.blob_file_size",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         if (!srv) return Status::OK();
-         if (!rocks_db.enable_blob_files) {
-           return {Status::NotOK, errBlobDbNotEnabled};
-         }
-         return 
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
-                                                            
std::to_string(rocks_db.blob_file_size));
-       }},
-      {"rocksdb.enable_blob_garbage_collection",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         if (!srv) return Status::OK();
-         if (!rocks_db.enable_blob_files) {
-           return {Status::NotOK, errBlobDbNotEnabled};
-         }
-         std::string enable_blob_garbage_collection = v == "yes" ? "true" : 
"false";
-         return 
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), 
enable_blob_garbage_collection);
-       }},
-      {"rocksdb.blob_garbage_collection_age_cutoff",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         if (!srv) return Status::OK();
-         if (!rocks_db.enable_blob_files) {
-           return {Status::NotOK, errBlobDbNotEnabled};
-         }
-         int val = 0;
-         auto parse_result = ParseInt<int>(v, 10);
-         if (!parse_result) {
-           return {Status::NotOK, "Illegal blob_garbage_collection_age_cutoff 
value."};
-         }
-         val = *parse_result;
-         if (val < 0 || val > 100) {
-           return {Status::NotOK, "blob_garbage_collection_age_cutoff must >= 
0 and <= 100."};
-         }
+             if (log_retention_days != -1) {
+               google::EnableLogCleaner(log_retention_days);
+             } else {
+               google::DisableLogCleaner();
+             }
+             return Status::OK();
+           }},
+          {"persist-cluster-nodes-enabled",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv || !cluster_enabled) return Status::OK();
+             auto nodes_file_path = NodesFilePath();
+             if (v == "yes") {
+               return srv->cluster->DumpClusterNodes(nodes_file_path);
+             }
+             // Remove the cluster nodes file to avoid stale cluster nodes info
+             remove(nodes_file_path.data());
+             return Status::OK();
+           }},
+          {"repl-namespace-enabled",
+           [](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             return srv->GetNamespace()->LoadAndRewrite();
+           }},
+
+          {"rocksdb.target_file_size_base",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             return 
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
+                                                                
std::to_string(rocks_db.target_file_size_base * MiB));
+           }},
+          {"rocksdb.write_buffer_size",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             return 
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
+                                                                
std::to_string(rocks_db.write_buffer_size * MiB));
+           }},
+          {"rocksdb.disable_auto_compactions",
+           [](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             std::string disable_auto_compactions = v == "yes" ? "true" : 
"false";
+             return 
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), 
disable_auto_compactions);
+           }},
+          {"rocksdb.max_total_wal_size",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             return srv->storage->SetDBOption(TrimRocksDbPrefix(k), 
std::to_string(rocks_db.max_total_wal_size * MiB));
+           }},
+          {"rocksdb.enable_blob_files",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             std::string enable_blob_files = rocks_db.enable_blob_files ? 
"true" : "false";
+             return 
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), 
enable_blob_files);
+           }},
+          {"rocksdb.min_blob_size",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             if (!rocks_db.enable_blob_files) {
+               return {Status::NotOK, errBlobDbNotEnabled};
+             }
+             return 
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), v);
+           }},
+          {"rocksdb.blob_file_size",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             if (!rocks_db.enable_blob_files) {
+               return {Status::NotOK, errBlobDbNotEnabled};
+             }
+             return 
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
+                                                                
std::to_string(rocks_db.blob_file_size));
+           }},
+          {"rocksdb.enable_blob_garbage_collection",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             if (!rocks_db.enable_blob_files) {
+               return {Status::NotOK, errBlobDbNotEnabled};
+             }
+             std::string enable_blob_garbage_collection = v == "yes" ? "true" 
: "false";
+             return 
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), 
enable_blob_garbage_collection);
+           }},
+          {"rocksdb.blob_garbage_collection_age_cutoff",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             if (!rocks_db.enable_blob_files) {
+               return {Status::NotOK, errBlobDbNotEnabled};
+             }
+             int val = 0;
+             auto parse_result = ParseInt<int>(v, 10);
+             if (!parse_result) {
+               return {Status::NotOK, "Illegal 
blob_garbage_collection_age_cutoff value."};
+             }
+             val = *parse_result;
+             if (val < 0 || val > 100) {
+               return {Status::NotOK, "blob_garbage_collection_age_cutoff must 
>= 0 and <= 100."};
+             }
 
-         double cutoff = val / 100.0;
-         return 
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), 
std::to_string(cutoff));
-       }},
-      {"rocksdb.level_compaction_dynamic_level_bytes",
-       [](Server *srv, const std::string &k, const std::string &v) -> Status {
-         if (!srv) return Status::OK();
-         std::string level_compaction_dynamic_level_bytes = v == "yes" ? 
"true" : "false";
-         return srv->storage->SetDBOption(TrimRocksDbPrefix(k), 
level_compaction_dynamic_level_bytes);
-       }},
-      {"rocksdb.max_bytes_for_level_base",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         if (!srv) return Status::OK();
-         if (!rocks_db.level_compaction_dynamic_level_bytes) {
-           return {Status::NotOK, errLevelCompactionDynamicLevelBytesNotSet};
-         }
-         return 
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
-                                                            
std::to_string(rocks_db.max_bytes_for_level_base));
-       }},
-      {"rocksdb.max_bytes_for_level_multiplier",
-       [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
-         if (!srv) return Status::OK();
-         if (!rocks_db.level_compaction_dynamic_level_bytes) {
-           return {Status::NotOK, errLevelCompactionDynamicLevelBytesNotSet};
-         }
-         return 
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), v);
-       }},
-      {"rocksdb.max_open_files", set_db_option_cb},
-      {"rocksdb.stats_dump_period_sec", set_db_option_cb},
-      {"rocksdb.delayed_write_rate", set_db_option_cb},
-      {"rocksdb.max_background_compactions", set_db_option_cb},
-      {"rocksdb.max_background_flushes", set_db_option_cb},
-      {"rocksdb.compaction_readahead_size", set_db_option_cb},
-      {"rocksdb.max_background_jobs", set_db_option_cb},
-
-      {"rocksdb.max_write_buffer_number", set_cf_option_cb},
-      {"rocksdb.level0_slowdown_writes_trigger", set_cf_option_cb},
-      {"rocksdb.level0_stop_writes_trigger", set_cf_option_cb},
-      {"rocksdb.level0_file_num_compaction_trigger", set_cf_option_cb},
-      {"rocksdb.compression", set_compression_type_cb},
+             double cutoff = val / 100.0;
+             return 
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), 
std::to_string(cutoff));
+           }},
+          {"rocksdb.level_compaction_dynamic_level_bytes",
+           [](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             std::string level_compaction_dynamic_level_bytes = v == "yes" ? 
"true" : "false";
+             return srv->storage->SetDBOption(TrimRocksDbPrefix(k), 
level_compaction_dynamic_level_bytes);
+           }},
+          {"rocksdb.max_bytes_for_level_base",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             if (!rocks_db.level_compaction_dynamic_level_bytes) {
+               return {Status::NotOK, 
errLevelCompactionDynamicLevelBytesNotSet};
+             }
+             return 
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
+                                                                
std::to_string(rocks_db.max_bytes_for_level_base));
+           }},
+          {"rocksdb.max_bytes_for_level_multiplier",
+           [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
+             if (!srv) return Status::OK();
+             if (!rocks_db.level_compaction_dynamic_level_bytes) {
+               return {Status::NotOK, 
errLevelCompactionDynamicLevelBytesNotSet};
+             }
+             return 
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), v);
+           }},
+          {"rocksdb.max_open_files", set_db_option_cb},
+          {"rocksdb.stats_dump_period_sec", set_db_option_cb},
+          {"rocksdb.delayed_write_rate", set_db_option_cb},
+          {"rocksdb.max_background_compactions", set_db_option_cb},
+          {"rocksdb.max_background_flushes", set_db_option_cb},
+          {"rocksdb.compaction_readahead_size", set_db_option_cb},
+          {"rocksdb.max_background_jobs", set_db_option_cb},
+
+          {"rocksdb.max_write_buffer_number", set_cf_option_cb},
+          {"rocksdb.level0_slowdown_writes_trigger", set_cf_option_cb},
+          {"rocksdb.level0_stop_writes_trigger", set_cf_option_cb},
+          {"rocksdb.level0_file_num_compaction_trigger", set_cf_option_cb},
+          {"rocksdb.compression", set_compression_type_cb},
 #ifdef ENABLE_OPENSSL
-      {"tls-cert-file", set_tls_option},
-      {"tls-key-file", set_tls_option},
-      {"tls-key-file-pass", set_tls_option},
-      {"tls-ca-cert-file", set_tls_option},
-      {"tls-ca-cert-dir", set_tls_option},
-      {"tls-protocols", set_tls_option},
-      {"tls-auth-clients", set_tls_option},
-      {"tls-ciphers", set_tls_option},
-      {"tls-ciphersuites", set_tls_option},
-      {"tls-prefer-server-ciphers", set_tls_option},
-      {"tls-session-caching", set_tls_option},
-      {"tls-session-cache-size", set_tls_option},
-      {"tls-session-cache-timeout", set_tls_option},
+          {"tls-cert-file", set_tls_option},
+          {"tls-key-file", set_tls_option},
+          {"tls-key-file-pass", set_tls_option},
+          {"tls-ca-cert-file", set_tls_option},
+          {"tls-ca-cert-dir", set_tls_option},
+          {"tls-protocols", set_tls_option},
+          {"tls-auth-clients", set_tls_option},
+          {"tls-ciphers", set_tls_option},
+          {"tls-ciphersuites", set_tls_option},
+          {"tls-prefer-server-ciphers", set_tls_option},
+          {"tls-session-caching", set_tls_option},
+          {"tls-session-cache-size", set_tls_option},
+          {"tls-session-cache-timeout", set_tls_option},
 #endif
-  };
+      };
   for (const auto &iter : callbacks) {
     auto field_iter = fields_.find(iter.first);
     if (field_iter != fields_.end()) {
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index 1d96d9d6..c1ef9707 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -32,6 +32,7 @@
 #include <event2/bufferevent_ssl.h>
 #endif
 
+#include "commands/blocking_commander.h"
 #include "redis_connection.h"
 #include "server.h"
 #include "time_util.h"
diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h
index 7086bb56..8461cc63 100644
--- a/src/server/redis_connection.h
+++ b/src/server/redis_connection.h
@@ -109,6 +109,7 @@ class Connection : public EvbufCallbackBase<Connection> {
   bool IsNeedFreeBufferEvent() const { return need_free_bev_; }
 
   Worker *Owner() { return owner_; }
+  void SetOwner(Worker *new_owner) { owner_ = new_owner; };
   int GetFD() { return bufferevent_getfd(bev_); }
   evbuffer *Input() { return bufferevent_get_input(bev_); }
   evbuffer *Output() { return bufferevent_get_output(bev_); }
diff --git a/src/server/server.cc b/src/server/server.cc
index 89c121a2..4f860152 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -111,10 +111,12 @@ Server::~Server() {
       break;
     }
   }
-  // Manually reset workers here to avoid accessing the conn_ctxs_ after it's 
freed
+
   for (auto &worker_thread : worker_threads_) {
     worker_thread.reset();
   }
+  cleanupExitedWorkerThreads();
+  CleanupExitedSlaves();
 
   lua::DestroyState(lua_);
 }
@@ -232,10 +234,6 @@ void Server::Stop() {
 }
 
 void Server::Join() {
-  for (const auto &worker : worker_threads_) {
-    worker->Join();
-  }
-
   if (auto s = util::ThreadJoin(cron_thread_); !s) {
     LOG(WARNING) << "Cron thread operation failed: " << s.Msg();
   }
@@ -245,6 +243,9 @@ void Server::Join() {
   if (auto s = task_runner_.Join(); !s) {
     LOG(WARNING) << s.Msg();
   }
+  for (const auto &worker : worker_threads_) {
+    worker->Join();
+  }
 }
 
 Status Server::AddMaster(const std::string &host, uint32_t port, bool 
force_reconnect) {
@@ -417,8 +418,6 @@ void Server::SubscribeChannel(const std::string &channel, 
redis::Connection *con
   std::lock_guard<std::mutex> guard(pubsub_channels_mu_);
 
   auto conn_ctx = ConnContext(conn->Owner(), conn->GetFD());
-  conn_ctxs_[conn_ctx] = true;
-
   if (auto iter = pubsub_channels_.find(channel); iter == 
pubsub_channels_.end()) {
     pubsub_channels_.emplace(channel, std::list<ConnContext>{conn_ctx});
   } else {
@@ -436,7 +435,6 @@ void Server::UnsubscribeChannel(const std::string &channel, 
redis::Connection *c
 
   for (const auto &conn_ctx : iter->second) {
     if (conn->GetFD() == conn_ctx.fd && conn->Owner() == conn_ctx.owner) {
-      conn_ctxs_.erase(conn_ctx);
       iter->second.remove(conn_ctx);
       if (iter->second.empty()) {
         pubsub_channels_.erase(iter);
@@ -473,8 +471,6 @@ void Server::PSubscribeChannel(const std::string &pattern, 
redis::Connection *co
   std::lock_guard<std::mutex> guard(pubsub_channels_mu_);
 
   auto conn_ctx = ConnContext(conn->Owner(), conn->GetFD());
-  conn_ctxs_[conn_ctx] = true;
-
   if (auto iter = pubsub_patterns_.find(pattern); iter == 
pubsub_patterns_.end()) {
     pubsub_patterns_.emplace(pattern, std::list<ConnContext>{conn_ctx});
   } else {
@@ -492,7 +488,6 @@ void Server::PUnsubscribeChannel(const std::string 
&pattern, redis::Connection *
 
   for (const auto &conn_ctx : iter->second) {
     if (conn->GetFD() == conn_ctx.fd && conn->Owner() == conn_ctx.owner) {
-      conn_ctxs_.erase(conn_ctx);
       iter->second.remove(conn_ctx);
       if (iter->second.empty()) {
         pubsub_patterns_.erase(iter);
@@ -506,7 +501,6 @@ void Server::BlockOnKey(const std::string &key, 
redis::Connection *conn) {
   std::lock_guard<std::mutex> guard(blocking_keys_mu_);
 
   auto conn_ctx = ConnContext(conn->Owner(), conn->GetFD());
-  conn_ctxs_[conn_ctx] = true;
 
   if (auto iter = blocking_keys_.find(key); iter == blocking_keys_.end()) {
     blocking_keys_.emplace(key, std::list<ConnContext>{conn_ctx});
@@ -527,7 +521,6 @@ void Server::UnblockOnKey(const std::string &key, 
redis::Connection *conn) {
 
   for (const auto &conn_ctx : iter->second) {
     if (conn->GetFD() == conn_ctx.fd && conn->Owner() == conn_ctx.owner) {
-      conn_ctxs_.erase(conn_ctx);
       iter->second.remove(conn_ctx);
       if (iter->second.empty()) {
         blocking_keys_.erase(iter);
@@ -596,7 +589,6 @@ void Server::WakeupBlockingConns(const std::string &key, 
size_t n_conns) {
     if (!s.IsOK()) {
       LOG(ERROR) << "[server] Failed to enable write event on blocked client " 
<< conn_ctx.fd << ": " << s.Msg();
     }
-    conn_ctxs_.erase(conn_ctx);
     iter->second.pop_front();
   }
 }
@@ -747,6 +739,10 @@ void Server::cron() {
       storage->SetDBInRetryableIOError(false);
     }
 
+    if (counter != 0 && counter % 10 == 0) {
+      cleanupExitedWorkerThreads();
+    }
+
     CleanupExitedSlaves();
     recordInstantaneousMetrics();
   }
@@ -1680,6 +1676,65 @@ void Server::AdjustOpenFilesLimit() {
   }
 }
 
+void Server::AdjustWorkerThreads() {
+  auto new_worker_threads = static_cast<size_t>(config_->workers);
+  if (new_worker_threads == worker_threads_.size()) {
+    return;
+  }
+  size_t delta = 0;
+  if (new_worker_threads > worker_threads_.size()) {
+    delta = new_worker_threads - worker_threads_.size();
+    increaseWorkerThreads(delta);
+    LOG(INFO) << "[server] Increase worker threads to " << new_worker_threads;
+    return;
+  }
+
+  delta = worker_threads_.size() - new_worker_threads;
+  LOG(INFO) << "[server] Decrease worker threads to " << new_worker_threads;
+  decreaseWorkerThreads(delta);
+}
+
+void Server::increaseWorkerThreads(size_t delta) {
+  std::vector<std::unique_ptr<WorkerThread>> new_threads;
+  for (size_t i = 0; i < delta; i++) {
+    auto worker = std::make_unique<Worker>(this, config_);
+    auto worker_thread = std::make_unique<WorkerThread>(std::move(worker));
+    worker_thread->Start();
+    worker_threads_.emplace_back(std::move(worker_thread));
+  }
+}
+
+void Server::decreaseWorkerThreads(size_t delta) {
+  auto current_worker_threads = worker_threads_.size();
+  DCHECK(current_worker_threads > delta);
+  auto remain_worker_threads = current_worker_threads - delta;
+  for (size_t i = remain_worker_threads; i < current_worker_threads; i++) {
+    // Unix socket will be listening on the first worker,
+    // so it MUST remove workers from the end of the vector.
+    // Otherwise, the unix socket will be closed.
+    auto worker_thread = std::move(worker_threads_.back());
+    worker_threads_.pop_back();
+    // Migrate connections to other workers before stopping the worker,
+    // we use round-robin to choose the target worker here.
+    auto connections = worker_thread->GetWorker()->GetConnections();
+    for (const auto &iter : connections) {
+      auto target_worker = worker_threads_[iter.first % 
remain_worker_threads]->GetWorker();
+      worker_thread->GetWorker()->MigrateConnection(target_worker, 
iter.second);
+    }
+    worker_thread->Stop();
+    // Don't join the worker thread here, because it may join itself.
+    recycle_worker_threads_.push(std::move(worker_thread));
+  }
+}
+
+void Server::cleanupExitedWorkerThreads() {
+  std::unique_ptr<WorkerThread> worker_thread = nullptr;
+  while (recycle_worker_threads_.try_pop(worker_thread)) {
+    worker_thread->Join();
+    worker_thread.reset();
+  }
+}
+
 std::string ServerLogData::Encode() const {
   if (type_ == kReplIdLog) {
     return std::string(1, kReplIdTag) + " " + content_;
diff --git a/src/server/server.h b/src/server/server.h
index b6ac94ec..bdbaacad 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -21,6 +21,7 @@
 #pragma once
 
 #include <inttypes.h>
+#include <tbb/concurrent_vector.h>
 
 #include <array>
 #include <atomic>
@@ -178,6 +179,7 @@ class Server {
   Config *GetConfig() { return config_; }
   static Status LookupAndCreateCommand(const std::string &cmd_name, 
std::unique_ptr<redis::Commander> *cmd);
   void AdjustOpenFilesLimit();
+  void AdjustWorkerThreads();
 
   Status AddMaster(const std::string &host, uint32_t port, bool 
force_reconnect);
   Status RemoveMaster();
@@ -301,6 +303,9 @@ class Server {
   Status autoResizeBlockAndSST();
   void updateWatchedKeysFromRange(const std::vector<std::string> &args, const 
redis::CommandKeyRange &range);
   void updateAllWatchedKeys();
+  void increaseWorkerThreads(size_t delta);
+  void decreaseWorkerThreads(size_t delta);
+  void cleanupExitedWorkerThreads();
 
   std::atomic<bool> stop_ = false;
   std::atomic<bool> is_loading_ = false;
@@ -343,7 +348,6 @@ class Server {
   LogCollector<SlowEntry> slow_log_;
   LogCollector<PerfEntry> perf_log_;
 
-  std::map<ConnContext, bool> conn_ctxs_;
   std::map<std::string, std::list<ConnContext>> pubsub_channels_;
   std::map<std::string, std::list<ConnContext>> pubsub_patterns_;
   std::mutex pubsub_channels_mu_;
@@ -362,6 +366,7 @@ class Server {
   TaskRunner task_runner_;
   std::vector<std::unique_ptr<WorkerThread>> worker_threads_;
   std::unique_ptr<ReplicationThread> replication_thread_;
+  tbb::concurrent_queue<std::unique_ptr<WorkerThread>> recycle_worker_threads_;
 
   // memory
   std::atomic<int64_t> memory_startup_use_ = 0;
diff --git a/src/server/worker.cc b/src/server/worker.cc
index 9506d566..7515a2f1 100644
--- a/src/server/worker.cc
+++ b/src/server/worker.cc
@@ -344,6 +344,31 @@ redis::Connection *Worker::removeConnection(int fd) {
   return conn;
 }
 
+// MigrateConnection moves the connection to another worker
+// when reducing the number of workers.
+//
+// To make it simple, we would close the connection if it's
+// blocked on a key or stream.
+void Worker::MigrateConnection(Worker *target, redis::Connection *conn) {
+  if (!target || !conn) return;
+  if (conn->current_cmd != nullptr && conn->current_cmd->IsBlocking()) {
+    // don't need to close the connection since destroy worker thread will 
close it
+    return;
+  }
+
+  if (!target->AddConnection(conn).IsOK()) {
+    // destroy worker thread will close the connection
+    return;
+  }
+  // remove the connection from current worker
+  DetachConnection(conn);
+  auto bev = conn->GetBufferEvent();
+  bufferevent_base_set(target->base_, bev);
+  conn->SetCB(bev);
+  bufferevent_enable(bev, EV_READ | EV_WRITE);
+  conn->SetOwner(target);
+}
+
 void Worker::DetachConnection(redis::Connection *conn) {
   if (!conn) return;
 
diff --git a/src/server/worker.h b/src/server/worker.h
index 70cdace2..a9fe7dd2 100644
--- a/src/server/worker.h
+++ b/src/server/worker.h
@@ -53,6 +53,7 @@ class Worker : EventCallbackBase<Worker>, 
EvconnlistenerBase<Worker> {
   void Stop();
   void Run(std::thread::id tid);
 
+  void MigrateConnection(Worker *target, redis::Connection *conn);
   void DetachConnection(redis::Connection *conn);
   void FreeConnection(redis::Connection *conn);
   void FreeConnectionByID(int fd, uint64_t id);
@@ -72,6 +73,7 @@ class Worker : EventCallbackBase<Worker>, 
EvconnlistenerBase<Worker> {
   void TimerCB(int, int16_t events);
 
   lua_State *Lua() { return lua_; }
+  std::map<int, redis::Connection *> GetConnections() const { return conns_; }
   Server *svr;
 
  private:
diff --git a/tests/cppunit/config_test.cc b/tests/cppunit/config_test.cc
index a54868c7..49e7623b 100644
--- a/tests/cppunit/config_test.cc
+++ b/tests/cppunit/config_test.cc
@@ -36,6 +36,7 @@ TEST(Config, GetAndSet) {
   auto s = config.Load(CLIOptions(path));
   EXPECT_FALSE(s.IsOK());
   std::map<std::string, std::string> mutable_cases = {
+      {"workers", "4"},
       {"log-level", "info"},
       {"timeout", "1000"},
       {"maxclients", "2000"},
@@ -108,7 +109,6 @@ TEST(Config, GetAndSet) {
       {"daemonize", "yes"},
       {"bind", "0.0.0.0"},
       {"repl-bind", "0.0.0.0"},
-      {"workers", "8"},
       {"repl-workers", "8"},
       {"tcp-backlog", "500"},
       {"slaveof", "no one"},
diff --git a/tests/gocase/unit/config/config_test.go 
b/tests/gocase/unit/config/config_test.go
index c6c9b682..20dcc612 100644
--- a/tests/gocase/unit/config/config_test.go
+++ b/tests/gocase/unit/config/config_test.go
@@ -25,10 +25,13 @@ import (
        "os"
        "path/filepath"
        "sort"
+       "strconv"
+       "sync"
        "testing"
        "time"
 
        "github.com/apache/kvrocks/tests/gocase/util"
+       "github.com/redis/go-redis/v9"
        "github.com/stretchr/testify/require"
 )
 
@@ -141,3 +144,92 @@ func TestStartWithoutConfigurationFile(t *testing.T) {
        require.NoError(t, rdb.Do(ctx, "SET", "foo", "bar").Err())
        require.Equal(t, "bar", rdb.Do(ctx, "GET", "foo").Val())
 }
+
+func TestDynamicChangeWorkerThread(t *testing.T) {
+       configs := map[string]string{}
+       srv := util.StartServer(t, configs)
+       defer srv.Close()
+
+       ctx := context.Background()
+       rdb := srv.NewClientWithOption(&redis.Options{
+               MaxIdleConns: 20,
+               MaxRetries:   -1, // Disable retry to check connections are 
alive after config change
+       })
+       defer func() { require.NoError(t, rdb.Close()) }()
+
+       t.Run("Test dynamic change worker thread", func(t *testing.T) {
+               runCommands := func(workers int) {
+                       var wg sync.WaitGroup
+                       require.NoError(t, rdb.Do(ctx, "CONFIG", "SET", 
"workers", strconv.Itoa(workers)).Err())
+                       for i := 0; i < 10; i++ {
+                               wg.Add(1)
+                               go func() {
+                                       defer wg.Done()
+                                       for j := 0; j < 10; j++ {
+                                               require.NoError(t, rdb.Set(ctx, 
"foo", "bar", 0).Err())
+                                       }
+                               }()
+                       }
+                       wg.Wait()
+               }
+               // Reduce worker threads to 4
+               runCommands(4)
+
+               // Reduce worker threads to 1
+               runCommands(1)
+
+               // Increase worker threads to 12
+               runCommands(12)
+       })
+
+       t.Run("Test dynamic change worker thread with blocking requests", 
func(t *testing.T) {
+               ctx, cancel := context.WithDeadline(context.Background(), 
time.Now().Add(10*time.Second))
+               defer cancel()
+
+               blockingTimeout := 5 * time.Second
+
+               var wg sync.WaitGroup
+               // blocking on list
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       _ = rdb.BLPop(ctx, blockingTimeout, "list")
+               }()
+
+               // channel subscribe
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       sub := rdb.Subscribe(ctx, "c1", "c2", "c3")
+                       _, _ = sub.ReceiveTimeout(ctx, blockingTimeout)
+               }()
+
+               // pattern subscribe
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       sub := rdb.PSubscribe(ctx, "c1", "c2", "c3")
+                       _, _ = sub.ReceiveTimeout(ctx, blockingTimeout)
+               }()
+
+               // blocking on stream
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       _ = rdb.XRead(ctx, &redis.XReadArgs{
+                               Streams: []string{"s1", "s2", "s3"},
+                               Count:   1,
+                               Block:   blockingTimeout,
+                       })
+               }()
+
+               require.NoError(t, rdb.Do(ctx, "CONFIG", "SET", "workers", 
"1").Err())
+               wg.Wait()
+
+               // We don't care about the result of these commands since we 
can't tell if the connection
+               // is migrated or not. We just want to confirm that server 
works well if there have blocking
+               // requests after changing worker threads.
+               require.NoError(t, rdb.Set(ctx, "foo", "bar", 0).Err())
+               require.Equal(t, "bar", rdb.Get(ctx, "foo").Val())
+       })
+}

Reply via email to