This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 222711fd0 feat(client): Add support for the CLIENT PAUSE/UNPAUSE 
commands to provide a temporary solution for failover (#3378)
222711fd0 is described below

commit 222711fd0e1a6e0f121b3713cacb63bb971b420a
Author: Paragrf <[email protected]>
AuthorDate: Fri Mar 6 12:19:40 2026 +0800

    feat(client): Add support for the CLIENT PAUSE/UNPAUSE commands to provide 
a temporary solution for failover (#3378)
    
    ### Summary
    This PR aims to provide a streamlined solution for graceful failover
    within the cluster. Previously, proposed failover schemes remained
    unmerged due to unresolved complexities and architectural concerns.
    
    To address this, this modification introduces the CLIENT PAUSE and
    CLIENT UNPAUSE commands—capabilities that were previously missing in
    Kvrocks. These commands provide the necessary primitives to temporarily
    suspend client traffic, ensuring data consistency and a seamless
    transition during the failover process.
    
    ### Background
    Based on https://github.com/apache/kvrocks/issues/3377, this PR
    implements two missing Redis protocols.
    
    - Implemented CLIENT PAUSE [timeout] [WRITE|ALL] to block client
    requests.
    
    - Implemented CLIENT UNPAUSE to resume normal operations.
    
    ### Implementation
    The behavior of CLIENT PAUSE/UNPAUSE is consistent with Redis:
    
    - Blocking Mechanism: After executing CLIENT PAUSE, Kvrocks will block
    connections attempting to run restricted commands.
    
    - Release/Error Handling: These commands are released once the timeout
    expires or CLIENT UNPAUSE is called.
    
    - Role Transition: If a role change occurs (e.g., Master → Slave) during
    the pause, any previously blocked write requests will return a READONLY
    error upon resumption.
    
    - Exemption mechanism:master-slave replication, and the PAUSE/UNPAUSE
    protocols themselves will not be blocked.
    
    - Testing:Comprehensive C++ unit tests and Go integration tests have
    been added
    
    ---------
    
    Co-authored-by: paragrf <[email protected]>
    Co-authored-by: hulk <[email protected]>
---
 src/commands/cmd_server.cc                    |  56 ++++-
 src/server/redis_connection.cc                |  24 +++
 src/server/redis_connection.h                 |   7 +
 src/server/server.cc                          |  83 ++++++++
 src/server/server.h                           |  28 +++
 src/server/worker.cc                          |  32 ++-
 src/server/worker.h                           |   1 +
 tests/cppunit/server_client_pause_test.cc     | 223 ++++++++++++++++++++
 tests/gocase/unit/client/client_pause_test.go | 286 ++++++++++++++++++++++++++
 9 files changed, 734 insertions(+), 6 deletions(-)

diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index be05d8f90..959de5227 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -26,6 +26,7 @@
 #include "commander.h"
 #include "commands/scan_base.h"
 #include "common/io_util.h"
+#include "common/logging.h"
 #include "common/rdb_stream.h"
 #include "common/string_util.h"
 #include "common/time_util.h"
@@ -520,6 +521,34 @@ class CommandClient : public Commander {
       return Status::OK();
     }
 
+    if (subcommand_ == "unpause") {
+      if (args.size() != 2) {
+        return {Status::RedisParseErr, errInvalidSyntax};
+      }
+      return Status::OK();
+    }
+
+    if (subcommand_ == "pause") {
+      if (args.size() < 3 || args.size() > 4) {
+        return {Status::RedisParseErr, errInvalidSyntax};
+      }
+      auto parse_result = ParseInt<uint64_t>(args[2], 10);
+      if (!parse_result) {
+        return {Status::RedisParseErr, errValueNotInteger};
+      }
+      pause_timeout_ms_ = *parse_result;
+      pause_mode_ = PauseMode::kAll;
+      if (args.size() == 4) {
+        std::string mode = util::ToLower(args[3]);
+        if (mode == "write") {
+          pause_mode_ = PauseMode::kWrite;
+        } else if (mode != "all") {
+          return {Status::RedisParseErr, errInvalidSyntax};
+        }
+      }
+      return Status::OK();
+    }
+
     if ((subcommand_ == "kill")) {
       if (args.size() == 2) {
         return {Status::RedisParseErr, errInvalidSyntax};
@@ -572,7 +601,9 @@ class CommandClient : public Commander {
       }
       return Status::OK();
     }
-    return {Status::RedisInvalidCmd, "Syntax error, try CLIENT LIST|INFO|KILL 
ip:port|GETNAME|SETNAME|REPLY"};
+    return {Status::RedisInvalidCmd,
+            "Syntax error, try CLIENT LIST|INFO|KILL 
ip:port|GETNAME|SETNAME|REPLY|"
+            "PAUSE|UNPAUSE"};
   }
 
   Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, 
Connection *conn, std::string *output) override {
@@ -611,9 +642,28 @@ class CommandClient : public Commander {
         *output = redis::RESP_OK;
       }
       return Status::OK();
+    } else if (subcommand_ == "pause") {
+      if (!conn->IsAdmin()) {
+        return {Status::RedisExecErr, errAdminPermissionRequired};
+      }
+      uint64_t now_ms = util::GetTimeStampMS();
+      srv->PauseConns(now_ms + pause_timeout_ms_, pause_mode_);
+      WARN("CLIENT PAUSE executed, timeout={}ms, mode={}, addr: {}", 
pause_timeout_ms_,
+           pause_mode_ == PauseMode::kWrite ? "write" : "all", 
conn->GetAddr());
+      *output = redis::RESP_OK;
+      return Status::OK();
+    } else if (subcommand_ == "unpause") {
+      if (!conn->IsAdmin()) {
+        return {Status::RedisExecErr, errAdminPermissionRequired};
+      }
+      srv->UnpauseConns();
+      *output = redis::RESP_OK;
+      return Status::OK();
     }
 
-    return {Status::RedisInvalidCmd, "Syntax error, try CLIENT LIST|INFO|KILL 
ip:port|GETNAME|SETNAME|REPLY"};
+    return {Status::RedisInvalidCmd,
+            "Syntax error, try CLIENT LIST|INFO|KILL 
ip:port|GETNAME|SETNAME|REPLY|"
+            "PAUSE|UNPAUSE"};
   }
 
  private:
@@ -625,6 +675,8 @@ class CommandClient : public Commander {
   int64_t kill_type_ = 0;
   uint64_t id_ = 0;
   bool new_format_ = true;
+  uint64_t pause_timeout_ms_ = 0;
+  PauseMode pause_mode_ = PauseMode::kAll;
 };
 
 class CommandMonitor : public Commander {
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index 5ff4948e0..f9db92b0c 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -209,6 +209,7 @@ std::string Connection::GetFlags() const {
   if (IsFlagEnabled(kMonitor)) flags.append("M");
   if (IsFlagEnabled(kAsking)) flags.append("A");
   if (!subscribe_channels_.empty() || !subscribe_patterns_.empty()) 
flags.append("P");
+  if (is_paused_) flags.append("z");
   if (flags.empty()) flags = "N";
   return flags;
 }
@@ -226,6 +227,21 @@ bool Connection::CanMigrate() const {
          && subscribe_channels_.empty() && subscribe_patterns_.empty();  // 
not subscribing any channel
 }
 
+void Connection::Pause() {
+  if (is_paused_) return;
+  is_paused_ = true;
+  bufferevent_disable(bev_, EV_READ);
+}
+
+void Connection::Unpause() {
+  if (!is_paused_) return;
+  is_paused_ = false;
+  bufferevent_enable(bev_, EV_READ);
+  // Trigger OnRead so commands buffered in req_ during the pause are 
processed.
+  // Without this, no new data arrives on the socket and OnRead would never 
fire.
+  bufferevent_trigger(bev_, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
+}
+
 void Connection::SubscribeChannel(const std::string &channel) {
   for (const auto &chan : subscribe_channels_) {
     if (channel == chan) return;
@@ -451,6 +467,14 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> 
*to_process_cmds) {
     }
 
     auto cmd_flags = attributes->GenerateFlags(cmd_tokens, *config);
+
+    // Push the command back and stop processing; it will be re-executed after 
unpause.
+    if (srv_->PauseConnIfNeeded(this, cmd_name, cmd_flags)) {
+      multi_error_exit.Disable();  // Don't mark transaction as failed - we're 
deferring, not erroring
+      to_process_cmds->push_front(std::move(cmd_tokens));
+      return;
+    }
+
     if (GetNamespace().empty()) {
       if (!password.empty()) {
         if (!(cmd_flags & kCmdAuth)) {
diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h
index 462292521..4eba076e6 100644
--- a/src/server/redis_connection.h
+++ b/src/server/redis_connection.h
@@ -176,6 +176,11 @@ class Connection : public EvbufCallbackBase<Connection> {
   bool IsImporting() const { return importing_; }
   bool CanMigrate() const;
 
+  // CLIENT PAUSE async suspend/resume
+  void Pause();
+  void Unpause();
+  bool IsPaused() const { return is_paused_; }
+
   // Multi exec
   void SetInExec() { in_exec_ = true; }
   bool IsInExec() const { return in_exec_; }
@@ -230,6 +235,8 @@ class Connection : public EvbufCallbackBase<Connection> {
 
   ReplyMode reply_mode_ = ReplyMode::ON;
   std::vector<std::string> queued_replies_;
+
+  bool is_paused_ = false;
 };
 
 }  // namespace redis
diff --git a/src/server/server.cc b/src/server/server.cc
index 5c337a23a..cc1f5b677 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -36,6 +36,7 @@
 #include <memory>
 #include <mutex>
 #include <shared_mutex>
+#include <unordered_set>
 #include <utility>
 
 #include "commands/command_parser.h"
@@ -994,10 +995,92 @@ void Server::cron() {
     }
 
     CleanupExitedSlaves();
+
+    // CLIENT PAUSE timeout check
+    if (conn_pause_end_time_ != 0 && util::GetTimeStampMS() >= 
conn_pause_end_time_) {
+      UnpauseConns();
+    }
+
     recordInstantaneousMetrics();
   }
 }
 
+void Server::PauseConns(uint64_t end_time_ms, PauseMode mode) {
+  std::lock_guard<std::mutex> lock(conn_pause_mu_);
+  conn_pause_end_time_ = end_time_ms;
+  conn_pause_mode_ = mode;
+}
+
+void Server::UnpauseConns() {
+  std::vector<PausedConnEntry> paused_conns;
+  {
+    std::lock_guard<std::mutex> lock(conn_pause_mu_);
+    conn_pause_end_time_ = 0;
+    conn_pause_mode_ = PauseMode::kOff;
+    paused_conns.swap(paused_conns_);
+  }
+  // Resume via the worker so fd+id validation under conns_mu_ serializes with 
FreeConnection.
+  for (auto &entry : paused_conns) {
+    entry.worker->UnpauseConnection(entry.fd, entry.id);
+  }
+}
+
+namespace {
+const std::unordered_set<std::string> kWriteModeSpecialCmds = {
+    "publish",
+    "pfcount",
+    "wait",
+};
+}  // namespace
+
+bool Server::PauseConnIfNeeded(redis::Connection *conn, const std::string 
&cmd_name, uint64_t cmd_flags) {
+  if (conn_pause_end_time_.load() == 0) {
+    return false;
+  }
+  if (conn->GetClientType() & kTypeSlave) {
+    return false;
+  }
+  // CLIENT subcommands (PAUSE/UNPAUSE) are exempt to avoid deadlock.
+  if (util::EqualICase(cmd_name, "client")) {
+    return false;
+  }
+
+  // Re-read mode under the lock to avoid a TOCTOU race with PauseConns.
+  std::lock_guard<std::mutex> lock(conn_pause_mu_);
+  if (conn_pause_end_time_.load() == 0) {
+    return false;
+  }
+
+  auto mode = conn_pause_mode_.load();
+  bool should_pause = false;
+  if (mode == PauseMode::kAll) {
+    should_pause = true;
+  } else if (mode == PauseMode::kWrite) {
+    if (cmd_flags & redis::kCmdWrite) {
+      should_pause = true;
+    } else {
+      should_pause = kWriteModeSpecialCmds.count(cmd_name) > 0;
+    }
+  }
+  if (!should_pause) {
+    return false;
+  }
+
+  if (!conn->IsPaused()) {
+    conn->Pause();
+    paused_conns_.push_back({conn->Owner(), conn->GetFD(), conn->GetID()});
+  }
+  return true;
+}
+
+void Server::RemovePausedConn(redis::Connection *conn) {
+  std::lock_guard<std::mutex> lock(conn_pause_mu_);
+  paused_conns_.erase(
+      std::remove_if(paused_conns_.begin(), paused_conns_.end(),
+                     [conn](const PausedConnEntry &e) { return e.fd == 
conn->GetFD() && e.id == conn->GetID(); }),
+      paused_conns_.end());
+}
+
 Server::InfoEntries Server::GetRocksDBInfo() {
   InfoEntries entries;
   if (is_loading_) return entries;
diff --git a/src/server/server.h b/src/server/server.h
index 845930150..63b83e283 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -25,11 +25,13 @@
 
 #include <array>
 #include <atomic>
+#include <condition_variable>
 #include <cstddef>
 #include <cstdint>
 #include <list>
 #include <map>
 #include <memory>
+#include <mutex>
 #include <set>
 #include <shared_mutex>
 #include <string>
@@ -110,6 +112,12 @@ enum class CursorType : uint8_t {
   kTypeSet = 3,   // cursor for SSCAN
   kTypeZSet = 4,  // cursor for ZSCAN
 };
+
+enum class PauseMode {
+  kOff = 0,
+  kAll = 1,
+  kWrite = 2,
+};
 struct CursorDictElement;
 
 class NumberCursor {
@@ -328,6 +336,13 @@ class Server {
   std::shared_lock<std::shared_mutex> WorkConcurrencyGuard();
   std::unique_lock<std::shared_mutex> WorkExclusivityGuard();
 
+  // CLIENT PAUSE / CLIENT UNPAUSE
+  void PauseConns(uint64_t end_time_ms, PauseMode mode);
+  // Returns true if the connection was suspended (caller must stop processing 
further commands).
+  bool PauseConnIfNeeded(redis::Connection *conn, const std::string &cmd_name, 
uint64_t cmd_flags);
+  void UnpauseConns();
+  void RemovePausedConn(redis::Connection *conn);
+
   Stats stats;
   engine::Storage *storage;
   MemoryProfiler memory_profiler;
@@ -450,4 +465,17 @@ class Server {
   std::atomic<uint16_t> cursor_counter_ = {0};
   using CursorDictType = std::array<CursorDictElement, CURSOR_DICT_SIZE>;
   std::unique_ptr<CursorDictType> cursor_dict_;
+
+  // Conn pause state (CLIENT PAUSE)
+  std::atomic<uint64_t> conn_pause_end_time_{0};
+  std::atomic<PauseMode> conn_pause_mode_{PauseMode::kOff};
+  std::mutex conn_pause_mu_;
+  // Fields are captured while the connection is alive; UnpauseConns never
+  // dereferences the pointer after releasing the lock, preventing 
use-after-free.
+  struct PausedConnEntry {
+    Worker *worker;
+    int fd;
+    uint64_t id;
+  };
+  std::vector<PausedConnEntry> paused_conns_;
 };
diff --git a/src/server/worker.cc b/src/server/worker.cc
index e97e374b5..45eceb705 100644
--- a/src/server/worker.cc
+++ b/src/server/worker.cc
@@ -96,6 +96,11 @@ Worker::~Worker() {
     iter->Close();
   }
 
+  for (const auto &lev : listen_events_) {
+    evconnlistener_free(lev);
+  }
+  listen_events_.clear();
+
   timer_.reset();
   if (rate_limit_group_) {
     bufferevent_rate_limit_group_free(rate_limit_group_);
@@ -326,6 +331,7 @@ void Worker::Stop(uint32_t wait_seconds) {
     // It's unnecessary to close the listener fd since we have set the 
LEV_OPT_CLOSE_ON_FREE flag
     evconnlistener_free(lev);
   }
+  listen_events_.clear();
   // wait_seconds == 0 means stop immediately, or it will wait N seconds
   // for the worker to process the remaining requests before stopping.
   if (wait_seconds > 0) {
@@ -381,8 +387,9 @@ redis::Connection *Worker::removeConnection(int fd) {
 // MigrateConnection moves the connection to another worker
 // when reducing the number of workers.
 //
-// To make it simple, we would close the connection if it's
-// blocked on a key or stream.
+// To make it simple, we do not migrate connections that are blocked on a key
+// or stream, or that are paused (CLIENT PAUSE). Such connections stay on the
+// worker being shut down and will be closed when it stops.
 void Worker::MigrateConnection(Worker *target, redis::Connection *conn) {
   if (!target || !conn) return;
 
@@ -392,10 +399,15 @@ void Worker::MigrateConnection(Worker *target, 
redis::Connection *conn) {
   // We cannot migrate the connection if it has a running command
   // since it will cause data race since the old worker may still process the 
command.
   if (!conn->CanMigrate()) {
-    // Need to enable read/write event again since we disabled them before
     bufferevent_enable(bev, EV_READ | EV_WRITE);
     return;
   }
+  // Paused connections are not migrated; they will be closed when the worker 
stops.
+  // Only re-enable WRITE here; READ must remain disabled to preserve the 
paused state.
+  if (conn->IsPaused()) {
+    bufferevent_enable(bev, EV_WRITE);
+    return;
+  }
 
   // remove the connection from current worker
   DetachConnection(conn);
@@ -405,8 +417,9 @@ void Worker::MigrateConnection(Worker *target, 
redis::Connection *conn) {
   }
   bufferevent_base_set(target->base_, bev);
   conn->SetCB(bev);
-  bufferevent_enable(bev, EV_READ | EV_WRITE);
+  // SetOwner before bufferevent_enable so callbacks see the correct owner.
   conn->SetOwner(target);
+  bufferevent_enable(bev, EV_READ | EV_WRITE);
 }
 
 void Worker::DetachConnection(redis::Connection *conn) {
@@ -429,6 +442,7 @@ void Worker::FreeConnection(redis::Connection *conn) {
   removeConnection(conn->GetFD());
   srv->ResetWatchedKeys(conn);
   srv->CleanupWaitConnection(conn);
+  if (conn->IsPaused()) srv->RemovePausedConn(conn);
   if (rate_limit_group_) {
     bufferevent_remove_from_rate_limit_group(conn->GetBufferEvent());
   }
@@ -468,6 +482,16 @@ Status Worker::EnableWriteEvent(int fd) {
   return {Status::NotOK, "connection doesn't exist"};
 }
 
+void Worker::UnpauseConnection(int fd, uint64_t id) {
+  std::unique_lock<std::mutex> lock(conns_mu_);
+  auto iter = conns_.find(fd);
+  // Validate that the connection still exists and has the same id to avoid
+  // use-after-free if the connection was freed between pause and unpause.
+  if (iter != conns_.end() && iter->second->GetID() == id) {
+    iter->second->Unpause();
+  }
+}
+
 Status Worker::Reply(int fd, const std::string &reply) {
   std::unique_lock<std::mutex> lock(conns_mu_);
   auto iter = conns_.find(fd);
diff --git a/src/server/worker.h b/src/server/worker.h
index bf4e86b23..0e3ff7c03 100644
--- a/src/server/worker.h
+++ b/src/server/worker.h
@@ -61,6 +61,7 @@ class Worker : EventCallbackBase<Worker>, 
EvconnlistenerBase<Worker> {
   Status AddConnection(redis::Connection *c);
   Status EnableWriteEvent(int fd);
   Status Reply(int fd, const std::string &reply);
+  void UnpauseConnection(int fd, uint64_t id);
   void BecomeMonitorConn(redis::Connection *conn);
   void QuitMonitorConn(redis::Connection *conn);
   void FeedMonitorConns(redis::Connection *conn, const std::string &response);
diff --git a/tests/cppunit/server_client_pause_test.cc 
b/tests/cppunit/server_client_pause_test.cc
new file mode 100644
index 000000000..e9c5cd43f
--- /dev/null
+++ b/tests/cppunit/server_client_pause_test.cc
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <event2/bufferevent.h>
+#include <gtest/gtest.h>
+
+#include <memory>
+
+#include "commands/commander.h"
+#include "common/time_util.h"
+#include "server/redis_connection.h"
+#include "server/server.h"
+#include "server/worker.h"
+#include "test_base.h"
+
+class ServerClientPauseTest : public TestBase {
+ protected:
+  void SetUp() override {
+    server_ = std::make_unique<Server>(storage_.get(), storage_->GetConfig());
+    server_->Stop();
+    server_->Join();
+
+    // Create a Worker (port=0 means no TCP listening happens)
+    worker_ = std::make_unique<Worker>(server_.get(), storage_->GetConfig());
+
+    // Create a bufferevent pair so we can construct a Connection without a 
real socket
+    base_ = event_base_new();
+    ASSERT_NE(base_, nullptr);
+    int rc = bufferevent_pair_new(base_, 0, bev_pair_);
+    ASSERT_EQ(rc, 0);
+
+    // Connection takes ownership of bev_pair_[0] by default; disable that so
+    // we can free the pair ourselves in TearDown.
+    conn_ = std::make_unique<redis::Connection>(bev_pair_[0], worker_.get());
+    conn_->NeedNotFreeBufferEvent();
+  }
+
+  void TearDown() override {
+    conn_.reset();
+    bufferevent_free(bev_pair_[0]);
+    bufferevent_free(bev_pair_[1]);
+    event_base_free(base_);
+    worker_.reset();
+    server_.reset();
+  }
+
+  // Returns true if the command is allowed through (not suspended by CLIENT 
PAUSE).
+  bool commandPassesThrough(const std::string &cmd_name, uint64_t cmd_flags) {
+    bool suspended = server_->PauseConnIfNeeded(conn_.get(), cmd_name, 
cmd_flags);
+    if (suspended) {
+      // clean up: resume and remove from paused list so the test stays 
isolated
+      server_->RemovePausedConn(conn_.get());
+      conn_->Unpause();
+    }
+    return !suspended;
+  }
+
+  std::unique_ptr<Server> server_;
+  std::unique_ptr<Worker> worker_;
+  event_base *base_ = nullptr;
+  bufferevent *bev_pair_[2] = {nullptr, nullptr};
+  std::unique_ptr<redis::Connection> conn_;
+};
+
+// ---------------------------------------------------------------------------
+// Group 1: PauseConns / UnpauseConns state management
+// ---------------------------------------------------------------------------
+
+TEST_F(ServerClientPauseTest, SetPauseAllMode) {
+  uint64_t future_ms = util::GetTimeStampMS() + 60000;
+  server_->PauseConns(future_ms, PauseMode::kAll);
+
+  // Write command should be suspended (not pass through).
+  EXPECT_FALSE(commandPassesThrough("set", redis::kCmdWrite));
+  server_->UnpauseConns();
+}
+
+TEST_F(ServerClientPauseTest, UnpauseClearsState) {
+  uint64_t future_ms = util::GetTimeStampMS() + 60000;
+  server_->PauseConns(future_ms, PauseMode::kAll);
+  server_->UnpauseConns();
+
+  // After unpause, commands should pass through immediately.
+  EXPECT_TRUE(commandPassesThrough("set", redis::kCmdWrite));
+  EXPECT_TRUE(commandPassesThrough("get", redis::kCmdReadOnly));
+}
+
+TEST_F(ServerClientPauseTest, SetPauseOverwrite) {
+  uint64_t future_ms1 = util::GetTimeStampMS() + 60000;
+  server_->PauseConns(future_ms1, PauseMode::kAll);
+
+  // Overwrite with a new pause – command should still be suspended.
+  server_->PauseConns(util::GetTimeStampMS() + 60000, PauseMode::kAll);
+  EXPECT_FALSE(commandPassesThrough("set", redis::kCmdWrite));
+  server_->UnpauseConns();
+}
+
+// ---------------------------------------------------------------------------
+// Group 2: PauseConnIfNeeded exemption rules
+// ---------------------------------------------------------------------------
+
+TEST_F(ServerClientPauseTest, NotPausedWhenEndTimeIsZero) {
+  // No pause set – any command should pass through immediately.
+  EXPECT_TRUE(commandPassesThrough("set", redis::kCmdWrite));
+  EXPECT_TRUE(commandPassesThrough("get", redis::kCmdReadOnly));
+}
+
+TEST_F(ServerClientPauseTest, ClientCommandIsExempt) {
+  // "client" subcommands (PAUSE/UNPAUSE) are exempt from pausing to avoid 
deadlock.
+  server_->PauseConns(util::GetTimeStampMS() + 60000, PauseMode::kAll);
+  EXPECT_TRUE(commandPassesThrough("client", 0));
+  server_->UnpauseConns();
+}
+
+TEST_F(ServerClientPauseTest, ReadCommandNotBlockedInWriteMode) {
+  // WRITE mode: read-only commands must not be suspended.
+  server_->PauseConns(util::GetTimeStampMS() + 60000, PauseMode::kWrite);
+  EXPECT_TRUE(commandPassesThrough("get", redis::kCmdReadOnly));
+  server_->UnpauseConns();
+}
+
+TEST_F(ServerClientPauseTest, WriteCommandBlockedInWriteMode) {
+  // WRITE mode: write commands should be suspended.
+  server_->PauseConns(util::GetTimeStampMS() + 60000, PauseMode::kWrite);
+  EXPECT_FALSE(commandPassesThrough("set", redis::kCmdWrite));
+  server_->UnpauseConns();
+}
+
+TEST_F(ServerClientPauseTest, SpecialCommandsBlockedInWriteMode) {
+  // WRITE mode: publish/pfcount/wait are also suspended.
+  server_->PauseConns(util::GetTimeStampMS() + 60000, PauseMode::kWrite);
+  for (const auto &cmd : {"publish", "pfcount", "wait"}) {
+    EXPECT_FALSE(commandPassesThrough(cmd, 0 /* no write flag, but special */))
+        << "Command '" << cmd << "' should be suspended in WRITE mode";
+  }
+  server_->UnpauseConns();
+}
+
+TEST_F(ServerClientPauseTest, AllCommandsBlockedInAllMode) {
+  // ALL mode: even read-only commands are suspended.
+  server_->PauseConns(util::GetTimeStampMS() + 60000, PauseMode::kAll);
+  EXPECT_FALSE(commandPassesThrough("get", redis::kCmdReadOnly));
+  EXPECT_FALSE(commandPassesThrough("set", redis::kCmdWrite));
+  server_->UnpauseConns();
+}
+
+// ---------------------------------------------------------------------------
+// Group 3: CommandClient::Parse tests
+// ---------------------------------------------------------------------------
+
+// Helper: look up the "client" command from the global table and invoke Parse.
+static Status ParseClientCommand(const std::vector<std::string> &args) {
+  auto *cmd_table = redis::CommandTable::Get();
+  auto it = cmd_table->find("client");
+  if (it == cmd_table->end()) {
+    return {Status::NotOK, "client command not found in table"};
+  }
+  auto cmd = it->second->factory();
+  cmd->SetArgs(args);
+  return cmd->Parse(args);
+}
+
+TEST_F(ServerClientPauseTest, ParsePauseBasic) {
+  auto s = ParseClientCommand({"client", "pause", "1000"});
+  EXPECT_TRUE(s.IsOK()) << s.Msg();
+}
+
+TEST_F(ServerClientPauseTest, ParsePauseWithAllMode) {
+  auto s = ParseClientCommand({"client", "pause", "1000", "ALL"});
+  EXPECT_TRUE(s.IsOK()) << s.Msg();
+}
+
+TEST_F(ServerClientPauseTest, ParsePauseWithWriteMode) {
+  auto s = ParseClientCommand({"client", "pause", "1000", "WRITE"});
+  EXPECT_TRUE(s.IsOK()) << s.Msg();
+}
+
+TEST_F(ServerClientPauseTest, ParsePauseInvalidMode) {
+  auto s = ParseClientCommand({"client", "pause", "1000", "READ"});
+  EXPECT_EQ(s.GetCode(), Status::RedisParseErr) << "Expected RedisParseErr for 
unsupported mode READ";
+}
+
+TEST_F(ServerClientPauseTest, ParsePauseMissingTimeout) {
+  auto s = ParseClientCommand({"client", "pause"});
+  EXPECT_EQ(s.GetCode(), Status::RedisParseErr) << "Expected RedisParseErr 
when timeout is missing";
+}
+
+TEST_F(ServerClientPauseTest, ParsePauseNonIntegerTimeout) {
+  auto s = ParseClientCommand({"client", "pause", "abc"});
+  EXPECT_EQ(s.GetCode(), Status::RedisParseErr) << "Expected RedisParseErr for 
non-integer timeout";
+}
+
+TEST_F(ServerClientPauseTest, ParsePauseTooManyArgs) {
+  auto s = ParseClientCommand({"client", "pause", "1000", "ALL", "EXTRA"});
+  EXPECT_EQ(s.GetCode(), Status::RedisParseErr) << "Expected RedisParseErr for 
too many arguments";
+}
+
+TEST_F(ServerClientPauseTest, ParseUnpauseBasic) {
+  auto s = ParseClientCommand({"client", "unpause"});
+  EXPECT_TRUE(s.IsOK()) << s.Msg();
+}
+
+TEST_F(ServerClientPauseTest, ParseUnpauseTooManyArgs) {
+  auto s = ParseClientCommand({"client", "unpause", "extra"});
+  EXPECT_EQ(s.GetCode(), Status::RedisParseErr) << "Expected RedisParseErr for 
extra argument to unpause";
+}
diff --git a/tests/gocase/unit/client/client_pause_test.go 
b/tests/gocase/unit/client/client_pause_test.go
new file mode 100644
index 000000000..232a16e40
--- /dev/null
+++ b/tests/gocase/unit/client/client_pause_test.go
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package client
+
+import (
+       "context"
+       "sync"
+       "testing"
+       "time"
+
+       "github.com/apache/kvrocks/tests/gocase/util"
+       "github.com/redis/go-redis/v9"
+       "github.com/stretchr/testify/require"
+)
+
+func TestClientPause(t *testing.T) {
+       srv := util.StartServer(t, map[string]string{
+               "requirepass": "admin",
+       })
+       defer srv.Close()
+
+       ctx := context.Background()
+
+       adminClient := srv.NewClientWithOption(&redis.Options{Password: 
"admin"})
+       defer func() { require.NoError(t, adminClient.Close()) }()
+
+       // unpauseClient is a separate connection used to send CLIENT UNPAUSE.
+       // It must be a different connection from the one being paused, because
+       // the paused connection has its read events disabled and cannot receive
+       // any new commands until it is resumed.
+       unpauseClient := srv.NewClientWithOption(&redis.Options{Password: 
"admin"})
+       defer func() { require.NoError(t, unpauseClient.Close()) }()
+
+       t.Run("CLIENT PAUSE blocks write commands in ALL mode", func(t 
*testing.T) {
+               require.NoError(t, adminClient.Do(ctx, "CLIENT", "PAUSE", 
"500").Err())
+
+               var wg sync.WaitGroup
+               wg.Add(1)
+               start := time.Now()
+               go func() {
+                       defer wg.Done()
+                       writeClient := 
srv.NewClientWithOption(&redis.Options{Password: "admin"})
+                       defer func() { require.NoError(t, writeClient.Close()) 
}()
+                       require.NoError(t, writeClient.Set(ctx, "k1", "v1", 
0).Err())
+               }()
+               wg.Wait()
+               require.GreaterOrEqual(t, time.Since(start).Milliseconds(), 
int64(400))
+
+               require.NoError(t, unpauseClient.Do(ctx, "CLIENT", 
"UNPAUSE").Err())
+       })
+
+       t.Run("CLIENT UNPAUSE releases paused clients immediately", func(t 
*testing.T) {
+               require.NoError(t, adminClient.Do(ctx, "CLIENT", "PAUSE", 
"10000", "WRITE").Err())
+
+               writeClient := srv.NewClientWithOption(&redis.Options{Password: 
"admin"})
+               defer func() { require.NoError(t, writeClient.Close()) }()
+
+               var wg sync.WaitGroup
+               wg.Add(1)
+               start := time.Now()
+               go func() {
+                       defer wg.Done()
+                       require.NoError(t, writeClient.Set(ctx, "k2", "v2", 
0).Err())
+               }()
+
+               time.Sleep(100 * time.Millisecond)
+               // k2 should not exist yet because the SET is still blocked by 
the pause.
+               require.Equal(t, redis.Nil, unpauseClient.Get(ctx, "k2").Err())
+
+               require.NoError(t, unpauseClient.Do(ctx, "CLIENT", 
"UNPAUSE").Err())
+               wg.Wait()
+               require.Less(t, time.Since(start).Milliseconds(), int64(5000))
+               // After UNPAUSE the blocked SET should have completed, so k2 
must be "v2".
+               val, err := unpauseClient.Get(ctx, "k2").Result()
+               require.NoError(t, err)
+               require.Equal(t, "v2", val)
+       })
+
+       t.Run("CLIENT PAUSE WRITE blocks write but not read commands", func(t 
*testing.T) {
+               require.NoError(t, adminClient.Set(ctx, "readkey", "hello", 
0).Err())
+               require.NoError(t, adminClient.Do(ctx, "CLIENT", "PAUSE", 
"2000", "WRITE").Err())
+
+               // Read commands on a separate connection should complete 
immediately.
+               readClient := srv.NewClientWithOption(&redis.Options{Password: 
"admin"})
+               defer func() { require.NoError(t, readClient.Close()) }()
+               start := time.Now()
+               val, err := readClient.Get(ctx, "readkey").Result()
+               require.NoError(t, err)
+               require.Equal(t, "hello", val)
+               require.Less(t, time.Since(start).Milliseconds(), int64(1000))
+
+               require.NoError(t, unpauseClient.Do(ctx, "CLIENT", 
"UNPAUSE").Err())
+       })
+
+       t.Run("CLIENT PAUSE requires admin permission", func(t *testing.T) {
+               require.NoError(t, adminClient.Do(ctx, "NAMESPACE", "ADD", 
"test_ns", "test_token").Err())
+
+               userClient := srv.NewClientWithOption(&redis.Options{Password: 
"test_token"})
+               defer func() { require.NoError(t, userClient.Close()) }()
+
+               r := userClient.Do(ctx, "CLIENT", "PAUSE", "1000")
+               require.ErrorContains(t, r.Err(), "admin permission required")
+
+               r = userClient.Do(ctx, "CLIENT", "UNPAUSE")
+               require.ErrorContains(t, r.Err(), "admin permission required")
+       })
+
+       t.Run("CLIENT PAUSE with invalid arguments", func(t *testing.T) {
+               r := adminClient.Do(ctx, "CLIENT", "PAUSE")
+               require.Error(t, r.Err())
+
+               r = adminClient.Do(ctx, "CLIENT", "PAUSE", "notanumber")
+               require.Error(t, r.Err())
+
+               r = adminClient.Do(ctx, "CLIENT", "PAUSE", "1000", "READ")
+               require.Error(t, r.Err())
+
+               r = adminClient.Do(ctx, "CLIENT", "UNPAUSE", "extra")
+               require.Error(t, r.Err())
+       })
+
+       t.Run("CLIENT LIST shows z flag for paused connections", func(t 
*testing.T) {
+               require.NoError(t, adminClient.Do(ctx, "CLIENT", "PAUSE", 
"10000").Err())
+
+               pausedClient := 
srv.NewClientWithOption(&redis.Options{Password: "admin"})
+               defer func() { require.NoError(t, pausedClient.Close()) }()
+
+               var wg sync.WaitGroup
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       require.NoError(t, pausedClient.Set(ctx, "flagtest", 
"v", 0).Err())
+               }()
+
+               // Give the goroutine time to send SET and get suspended.
+               time.Sleep(100 * time.Millisecond)
+
+               list, err := unpauseClient.Do(ctx, "CLIENT", "LIST").Text()
+               require.NoError(t, err)
+               require.Contains(t, list, "flags=z")
+
+               require.NoError(t, unpauseClient.Do(ctx, "CLIENT", 
"UNPAUSE").Err())
+               wg.Wait()
+       })
+
+       t.Run("CLIENT UNPAUSE from a different connection is not blocked", 
func(t *testing.T) {
+               require.NoError(t, adminClient.Do(ctx, "CLIENT", "PAUSE", 
"30000").Err())
+
+               pausedClient := 
srv.NewClientWithOption(&redis.Options{Password: "admin"})
+               defer func() { require.NoError(t, pausedClient.Close()) }()
+
+               var wg sync.WaitGroup
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       require.NoError(t, pausedClient.Set(ctx, 
"unpause_regression", "v", 0).Err())
+               }()
+
+               time.Sleep(100 * time.Millisecond)
+
+               // CLIENT UNPAUSE must complete immediately from a separate 
connection,
+               // not get blocked. This is the regression test for the 
original bug.
+               start := time.Now()
+               require.NoError(t, unpauseClient.Do(ctx, "CLIENT", 
"UNPAUSE").Err())
+               require.Less(t, time.Since(start).Milliseconds(), int64(1000))
+
+               wg.Wait()
+       })
+
+       t.Run("CLIENT PAUSE blocks EXEC in MULTI/EXEC", func(t *testing.T) {
+               multiClient := srv.NewClientWithOption(&redis.Options{Password: 
"admin"})
+               defer func() { require.NoError(t, multiClient.Close()) }()
+
+               require.NoError(t, multiClient.Do(ctx, "MULTI").Err())
+               require.NoError(t, multiClient.Set(ctx, "multi_pause_key", "1", 
0).Err())
+
+               // EXEC has no "write" flag (exclusive bypass-multi slow), so 
use ALL mode to block it.
+               go func() {
+                       time.Sleep(50 * time.Millisecond)
+                       require.NoError(t, adminClient.Do(ctx, "CLIENT", 
"PAUSE", "3000").Err())
+               }()
+               time.Sleep(150 * time.Millisecond)
+
+               var wg sync.WaitGroup
+               wg.Add(1)
+               var execStart time.Time
+               var execErr error
+               go func() {
+                       defer wg.Done()
+                       execStart = time.Now()
+                       execErr = multiClient.Do(ctx, "EXEC").Err()
+               }()
+
+               time.Sleep(400 * time.Millisecond)
+               require.NoError(t, unpauseClient.Do(ctx, "CLIENT", 
"UNPAUSE").Err())
+               wg.Wait()
+
+               require.NoError(t, execErr)
+               require.GreaterOrEqual(t, time.Since(execStart).Milliseconds(), 
int64(400))
+               val, err := unpauseClient.Get(ctx, "multi_pause_key").Result()
+               require.NoError(t, err)
+               require.Equal(t, "1", val)
+       })
+
+       t.Run("CLIENT PAUSE blocks BLPOP wakeup until UNPAUSE", func(t 
*testing.T) {
+               blpopClient := srv.NewTCPClient()
+               defer func() { require.NoError(t, blpopClient.Close()) }()
+               require.NoError(t, blpopClient.WriteArgs("AUTH", "admin"))
+               blpopClient.MustRead(t, "+OK")
+
+               require.NoError(t, adminClient.Del(ctx, "blist").Err())
+               require.NoError(t, blpopClient.WriteArgs("BLPOP", "blist", "0"))
+
+               time.Sleep(50 * time.Millisecond)
+               require.NoError(t, adminClient.Do(ctx, "CLIENT", "PAUSE", 
"5000", "WRITE").Err())
+
+               pushClient := srv.NewClientWithOption(&redis.Options{Password: 
"admin"})
+               defer func() { require.NoError(t, pushClient.Close()) }()
+
+               var wg sync.WaitGroup
+               wg.Add(1)
+               var blpopDone time.Time
+               go func() {
+                       defer wg.Done()
+                       blpopClient.MustReadStrings(t, []string{"blist", 
"data"})
+                       blpopDone = time.Now()
+               }()
+
+               var pushWg sync.WaitGroup
+               pushWg.Add(1)
+               go func() {
+                       defer pushWg.Done()
+                       require.NoError(t, pushClient.RPush(ctx, "blist", 
"data").Err())
+               }()
+
+               time.Sleep(200 * time.Millisecond)
+               unpauseStart := time.Now()
+               require.NoError(t, unpauseClient.Do(ctx, "CLIENT", 
"UNPAUSE").Err())
+               pushWg.Wait()
+               wg.Wait()
+
+               require.GreaterOrEqual(t, 
blpopDone.Sub(unpauseStart).Milliseconds(), int64(0))
+       })
+
+       t.Run("CLIENT PAUSE WRITE blocks EVAL until UNPAUSE", func(t 
*testing.T) {
+               require.NoError(t, adminClient.Do(ctx, "CLIENT", "PAUSE", 
"2000", "WRITE").Err())
+
+               evalClient := srv.NewClientWithOption(&redis.Options{Password: 
"admin"})
+               defer func() { require.NoError(t, evalClient.Close()) }()
+
+               var wg sync.WaitGroup
+               wg.Add(1)
+               start := time.Now()
+               var evalErr error
+               var evalVal interface{}
+               go func() {
+                       defer wg.Done()
+                       evalVal, evalErr = evalClient.Eval(ctx, `return 
redis.call('ping')`, []string{}).Result()
+               }()
+
+               time.Sleep(400 * time.Millisecond)
+               require.NoError(t, unpauseClient.Do(ctx, "CLIENT", 
"UNPAUSE").Err())
+               wg.Wait()
+
+               require.NoError(t, evalErr)
+               require.Equal(t, "PONG", evalVal)
+               require.GreaterOrEqual(t, time.Since(start).Milliseconds(), 
int64(400))
+       })
+}


Reply via email to