torwig commented on code in PR #2498:
URL: https://github.com/apache/kvrocks/pull/2498#discussion_r1731529953


##########
src/server/redis_connection.cc:
##########
@@ -97,6 +103,9 @@ void Connection::OnRead(struct bufferevent *bev) {
   if (IsFlagEnabled(kCloseAsync)) {
     Close();
   }
+  if (IsReachOBufLimit()) {

Review Comment:
   We can expand the letter `O` to `Output` and have something like 
`IsOutputBufferLimitReached`.



##########
src/server/redis_connection.cc:
##########
@@ -559,4 +652,86 @@ void Connection::ResetMultiExec() {
   DisableFlag(Connection::kMultiExec);
 }
 
+bool Connection::CheckClientReachOBufLimits(size_t reply_bytes) {
+  assert(reply_bytes < SIZE_MAX - (1024 * 64));
+  if (reply_bytes == 0 && GetClientType() != kTypeSlave) {
+    return false;
+  }
+  int client_type = 0;
+  if (GetClientType() == kTypeSlave) {
+    client_type = 1;
+  } else if (GetClientType() == kTypePubsub) {
+    client_type = 2;
+  } else {
+    client_type = 0;
+  }
+
+  bool hard = false;
+  bool soft = false;
+  auto hard_limit_bytes = 
GetServer()->GetConfig()->GetClientOutputBufferLimits()[client_type].hard_limit_bytes;
+  auto soft_limit_bytes = 
GetServer()->GetConfig()->GetClientOutputBufferLimits()[client_type].soft_limit_bytes;
+  if (hard_limit_bytes && reply_bytes >= hard_limit_bytes) {
+    hard = true;
+  }
+  if (soft_limit_bytes && reply_bytes >= soft_limit_bytes) {
+    soft = true;
+  }
+
+  if (soft) {
+    if (GetObufSoftLimitReachedTime() == 0) {
+      SetObufSoftLimitReachedTime(util::GetTimeStamp());
+      soft = false;
+    } else {
+      uint64_t elapsed = util::GetTimeStamp() - GetObufSoftLimitReachedTime();
+      if (elapsed <= 
GetServer()->GetConfig()->GetClientOutputBufferLimits()[0].soft_limit_seconds) {
+        soft = false;
+      }
+    }
+  } else {
+    SetObufSoftLimitReachedTime(0);
+  }
+  return hard || soft;
+}
+
+std::string Connection::CheckClientReachOBufLimits(const std::string &msg) {
+  auto memSize = msg.size() + GetOutputBuffer().capacity() + 
evbuffer_get_length(Output());

Review Comment:
   `memSize` -> `mem_size`



##########
src/server/redis_connection.cc:
##########
@@ -559,4 +652,86 @@ void Connection::ResetMultiExec() {
   DisableFlag(Connection::kMultiExec);
 }
 
+bool Connection::CheckClientReachOBufLimits(size_t reply_bytes) {

Review Comment:
   The same here: `O` -> `Output`.



##########
src/server/redis_connection.cc:
##########
@@ -559,4 +652,86 @@ void Connection::ResetMultiExec() {
   DisableFlag(Connection::kMultiExec);
 }
 
+bool Connection::CheckClientReachOBufLimits(size_t reply_bytes) {
+  assert(reply_bytes < SIZE_MAX - (1024 * 64));
+  if (reply_bytes == 0 && GetClientType() != kTypeSlave) {
+    return false;
+  }
+  int client_type = 0;

Review Comment:
   Maybe we can extract this logic to a separate function, introducing 
descriptive constants for 0, 1, 2.



##########
src/server/server.cc:
##########
@@ -839,6 +839,11 @@ void Server::cron() {
       cleanupExitedWorkerThreads(false);
     }
 
+    // check if we need to evict connections every 10s
+    if (counter != 0 && counter % 100 == 0 && config_->max_memory_clients > 0) 
{

Review Comment:
   Can we make the condition `config_->max_memory_clients > 0` the first one to 
benefit from fast evaluation if this option is disabled in the config?



##########
src/server/redis_connection.h:
##########
@@ -180,6 +172,15 @@ class Connection : public EvbufCallbackBase<Connection> {
   std::set<std::string> watched_keys;
   std::atomic<bool> watched_keys_modified = false;
 
+  bool CheckClientReachOBufLimits(size_t reply_bytes);

Review Comment:
   In all functions of this "family" we can expand  a single `O` letter to 
`Output`



##########
src/config/config_type.h:
##########
@@ -338,3 +338,119 @@ class IntWithUnitField : public ConfigField {
   T max_;
   IntUnit current_unit_ = IntUnit::None;
 };
+
+struct ClientOutputBufferLimitConfig {
+  uint64_t hard_limit_bytes = 0;
+  uint64_t soft_limit_bytes = 0;
+  uint64_t soft_limit_seconds = 0;
+};
+
+constexpr int CLIENT_TYPE_OBUF_COUNT = 3;
+
+class ClientOutputBufferLimitField : public ConfigField {
+ public:
+  ClientOutputBufferLimitField(std::vector<ClientOutputBufferLimitConfig> 
*limits, std::string default_val)
+      : receiver_(limits), default_val_(std::move(default_val)) {
+    if (receiver_->empty()) {
+      receiver_->resize(CLIENT_TYPE_OBUF_COUNT);
+    }
+    CHECK(Set(default_val_));
+    config_type = ConfigType::SingleConfig;
+  }
+  std::string Default() const override { return default_val_; }
+
+  std::string ToString() const override {
+    std::ostringstream ss;
+    for (size_t i = 0; i < receiver_->size(); ++i) {
+      if (i != 0) ss << " ";
+      ss << clientTypeToString(i) << " " << receiver_->at(i).hard_limit_bytes 
<< " "
+         << receiver_->at(i).soft_limit_bytes << " " << 
receiver_->at(i).soft_limit_seconds;
+    }
+    return ss.str();
+  }
+
+  Status Set(const std::string &v) override {
+    std::vector<std::string> tokens = splitString(v, ' ');
+    if (tokens.size() % 4 != 0) {
+      return {Status::NotOK, "Wrong number of arguments in buffer limit 
configuration."};
+    }
+
+    for (size_t i = 0; i < tokens.size(); i += 4) {
+      int class_type = getClientTypeByName(tokens[i]);
+      if (class_type == -1) {
+        return {Status::NotOK, "Invalid client class specified in buffer limit 
configuration."};
+      }
+
+      uint64_t hard_limit = 0, soft_limit = 0;
+      auto s = capacityToInt(tokens[i + 1], hard_limit);
+      if (!s.IsOK()) return s;
+      s = capacityToInt(tokens[i + 2], soft_limit);
+      if (!s.IsOK()) return s;
+      auto [soft_seconds, rest] = GET_OR_RET(TryParseInt(tokens[i + 
3].c_str(), 10));
+      if (*rest != 0) {
+        return {Status::NotOK, "Error in soft_seconds setting in buffer limit 
configuration."};
+      }
+
+      ClientOutputBufferLimitConfig config = {hard_limit, soft_limit, 
static_cast<uint64_t>(soft_seconds)};
+      (*receiver_)[class_type] = config;
+    }
+    return Status::OK();
+  }
+
+ private:
+  static std::string clientTypeToString(size_t type) {
+    switch (type) {
+      case 0:
+        return "normal";
+      case 1:
+        return "replica";
+      case 2:
+        return "pubsub";
+      default:
+        return "unknown";
+    }
+  }
+
+  static std::vector<std::string> splitString(const std::string &str, char 
delim) {
+    std::vector<std::string> tokens;
+    std::string token;
+    std::istringstream token_stream(str);
+    while (std::getline(token_stream, token, delim)) {
+      tokens.push_back(token);
+    }
+    return tokens;
+  }
+
+  static Status capacityToInt(const std::string &str, uint64_t &capacity) {

Review Comment:
   We have something similar here src/common/parse_util.cc -> 
ParseSizeAndUnit(). Can it serve your purpose here?



##########
src/server/redis_connection.h:
##########
@@ -180,6 +172,15 @@ class Connection : public EvbufCallbackBase<Connection> {
   std::set<std::string> watched_keys;
   std::atomic<bool> watched_keys_modified = false;
 
+  bool CheckClientReachOBufLimits(size_t reply_bytes);
+  std::string CheckClientReachOBufLimits(const std::string &msg);
+  void SetObufSoftLimitReachedTime(int64_t time) { 
obuf_soft_limit_reached_time_ = time; }
+  int64_t GetObufSoftLimitReachedTime() const { return 
obuf_soft_limit_reached_time_; }
+  inline std::string &GetOutputBuffer() { return output_buffer_; }
+  size_t GetConnectionMemoryUsed() const;
+  void SetReachOBufLimit(bool reach) { is_reach_obuf_limit_ = reach; }
+  bool IsReachOBufLimit() const { return is_reach_obuf_limit_; }

Review Comment:
   `IsReachOBufLimit` -> `IsOutputBufLimitReached`.



##########
src/server/redis_connection.cc:
##########
@@ -559,4 +652,86 @@ void Connection::ResetMultiExec() {
   DisableFlag(Connection::kMultiExec);
 }
 
+bool Connection::CheckClientReachOBufLimits(size_t reply_bytes) {
+  assert(reply_bytes < SIZE_MAX - (1024 * 64));
+  if (reply_bytes == 0 && GetClientType() != kTypeSlave) {
+    return false;
+  }
+  int client_type = 0;
+  if (GetClientType() == kTypeSlave) {
+    client_type = 1;
+  } else if (GetClientType() == kTypePubsub) {
+    client_type = 2;
+  } else {
+    client_type = 0;
+  }
+
+  bool hard = false;
+  bool soft = false;
+  auto hard_limit_bytes = 
GetServer()->GetConfig()->GetClientOutputBufferLimits()[client_type].hard_limit_bytes;
+  auto soft_limit_bytes = 
GetServer()->GetConfig()->GetClientOutputBufferLimits()[client_type].soft_limit_bytes;
+  if (hard_limit_bytes && reply_bytes >= hard_limit_bytes) {
+    hard = true;
+  }
+  if (soft_limit_bytes && reply_bytes >= soft_limit_bytes) {
+    soft = true;
+  }
+
+  if (soft) {
+    if (GetObufSoftLimitReachedTime() == 0) {
+      SetObufSoftLimitReachedTime(util::GetTimeStamp());
+      soft = false;
+    } else {
+      uint64_t elapsed = util::GetTimeStamp() - GetObufSoftLimitReachedTime();
+      if (elapsed <= 
GetServer()->GetConfig()->GetClientOutputBufferLimits()[0].soft_limit_seconds) {
+        soft = false;
+      }
+    }
+  } else {
+    SetObufSoftLimitReachedTime(0);
+  }
+  return hard || soft;
+}
+
+std::string Connection::CheckClientReachOBufLimits(const std::string &msg) {
+  auto memSize = msg.size() + GetOutputBuffer().capacity() + 
evbuffer_get_length(Output());
+  if (CheckClientReachOBufLimits(memSize)) {
+    SetReachOBufLimit(true);
+    return "";
+  }
+  return msg;
+}
+
+size_t Connection::GetConnectionMemoryUsed() const {
+  size_t total_memory = sizeof(*this);  // 包含所有成员变量的静态内存大小

Review Comment:
   Could you please translate the comment into English?



##########
src/server/worker.cc:
##########
@@ -102,8 +102,13 @@ Worker::~Worker() {
 
 void Worker::TimerCB(int, int16_t events) {
   auto config = srv->GetConfig();
-  if (config->timeout == 0) return;
-  KickoutIdleClients(config->timeout);
+  if (config->timeout != 0) {
+    KickoutIdleClients(config->timeout);
+  }
+
+  if (config->IsClientOutputBufferLimitsEnabled()) {
+    KickoutReachOBufLimitsClients();

Review Comment:
   The same: `O` -> `Output`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to