This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 5c9137c5 Add BlockingCommander to refactor all blocking commands
(#1757)
5c9137c5 is described below
commit 5c9137c56703528bb164d1cc0f9bbcd5d0079ec1
Author: Twice <[email protected]>
AuthorDate: Wed Sep 13 14:37:27 2023 +0900
Add BlockingCommander to refactor all blocking commands (#1757)
---
src/commands/blocking_commander.h | 127 ++++++++++++++++++++++++++++
src/commands/cmd_list.cc | 158 ++++++----------------------------
src/commands/cmd_zset.cc | 173 ++++++++------------------------------
3 files changed, 189 insertions(+), 269 deletions(-)
diff --git a/src/commands/blocking_commander.h
b/src/commands/blocking_commander.h
new file mode 100644
index 00000000..537e770f
--- /dev/null
+++ b/src/commands/blocking_commander.h
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include "commander.h"
+#include "event_util.h"
+#include "server/redis_connection.h"
+
+namespace redis {
+
+class BlockingCommander : public Commander,
+ private EvbufCallbackBase<BlockingCommander, false>,
+ private EventCallbackBase<BlockingCommander> {
+ public:
+ // method to reply when no operation happens
+ virtual std::string NoopReply() = 0;
+
+ // method to block keys
+ virtual void BlockKeys() = 0;
+
+ // method to unblock keys
+ virtual void UnblockKeys() = 0;
+
+ // method to access database in write callback
+ // the return value indicates if the real database operation happens
+ // in other words, returning true indicates ending the blocking
+ virtual bool OnBlockingWrite() = 0;
+
+ // to start the blocking process
+ // usually put to the end of the Execute method
+ Status StartBlocking(int64_t timeout, std::string *output) {
+ if (conn_->IsInExec()) {
+ *output = NoopReply();
+ return Status::OK(); // no blocking in multi-exec
+ }
+
+ BlockKeys();
+ SetCB(conn_->GetBufferEvent());
+
+ if (timeout) {
+ InitTimer(timeout);
+ }
+
+ return {Status::BlockingCmd};
+ }
+
+ void OnWrite(bufferevent *bev) {
+ bool done = OnBlockingWrite();
+
+ if (!done) {
+ // The connection may be waked up but can't pop from the datatype.
+ // For example, connection A is blocked on it and connection B added a
new element;
+ // then connection A was unblocked, but this element may be taken by
+ // another connection C. So we need to block connection A again
+ // and wait for the element being added by disabling the WRITE event.
+ bufferevent_disable(bev, EV_WRITE);
+ return;
+ }
+
+ if (timer_) {
+ timer_.reset();
+ }
+
+ UnblockKeys();
+ conn_->SetCB(bev);
+ bufferevent_enable(bev, EV_READ);
+ // We need to manually trigger the read event since we will stop
processing commands
+ // in connection after the blocking command, so there may have some
commands to be processed.
+ // Related issue: https://github.com/apache/kvrocks/issues/831
+ bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
+ }
+
+ void OnEvent(bufferevent *bev, int16_t events) {
+ if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
+ if (timer_ != nullptr) {
+ timer_.reset();
+ }
+ UnblockKeys();
+ }
+ conn_->OnEvent(bev, events);
+ }
+
+ // Usually put to the top of the Execute method
+ void InitConnection(Connection *conn) { conn_ = conn; }
+
+ void InitTimer(int64_t timeout) {
+ auto bev = conn_->GetBufferEvent();
+ timer_.reset(NewTimer(bufferevent_get_base(bev)));
+ int64_t timeout_second = timeout / 1000 / 1000;
+ int64_t timeout_microsecond = timeout % (1000 * 1000);
+ timeval tm = {timeout_second, static_cast<int>(timeout_microsecond)};
+ evtimer_add(timer_.get(), &tm);
+ }
+
+ void TimerCB(int, int16_t) {
+ conn_->Reply(NoopReply());
+ timer_.reset();
+ UnblockKeys();
+ auto bev = conn_->GetBufferEvent();
+ conn_->SetCB(bev);
+ bufferevent_enable(bev, EV_READ);
+ }
+
+ protected:
+ Connection *conn_ = nullptr;
+ UniqueEvent timer_;
+};
+
+} // namespace redis
diff --git a/src/commands/cmd_list.cc b/src/commands/cmd_list.cc
index 82e3e091..ad5a50db 100644
--- a/src/commands/cmd_list.cc
+++ b/src/commands/cmd_list.cc
@@ -19,9 +19,11 @@
*/
#include "commander.h"
+#include "commands/blocking_commander.h"
#include "commands/command_parser.h"
#include "error_constants.h"
#include "event_util.h"
+#include "server/redis_reply.h"
#include "server/server.h"
#include "types/redis_list.h"
@@ -232,9 +234,7 @@ class CommandLMPop : public Commander {
std::vector<std::string> keys_;
};
-class CommandBPop : public Commander,
- private EvbufCallbackBase<CommandBPop, false>,
- private EventCallbackBase<CommandBPop> {
+class CommandBPop : public BlockingCommander {
public:
explicit CommandBPop(bool left) : left_(left) {}
@@ -261,34 +261,26 @@ class CommandBPop : public Commander,
Status Execute(Server *svr, Connection *conn, std::string *output) override {
svr_ = svr;
- conn_ = conn;
+ InitConnection(conn);
- auto bev = conn->GetBufferEvent();
auto s = TryPopFromList();
if (s.ok() || !s.IsNotFound()) {
return Status::OK(); // error has already output in TryPopFromList
}
- if (conn->IsInExec()) {
- *output = redis::MultiLen(-1);
- return Status::OK(); // No blocking in multi-exec
- }
+ return StartBlocking(timeout_, output);
+ }
+ void BlockKeys() override {
for (const auto &key : keys_) {
svr_->BlockOnKey(key, conn_);
}
+ }
- SetCB(bev);
-
- if (timeout_) {
- timer_.reset(NewTimer(bufferevent_get_base(bev)));
- int64_t timeout_second = timeout_ / 1000 / 1000;
- int64_t timeout_microsecond = timeout_ % (1000 * 1000);
- timeval tm = {timeout_second, static_cast<int>(timeout_microsecond)};
- evtimer_add(timer_.get(), &tm);
+ void UnblockKeys() override {
+ for (const auto &key : keys_) {
+ svr_->UnblockOnKey(key, conn_);
}
-
- return {Status::BlockingCmd};
}
rocksdb::Status TryPopFromList() {
@@ -318,62 +310,18 @@ class CommandBPop : public Commander,
return s;
}
- void OnWrite(bufferevent *bev) {
+ bool OnBlockingWrite() override {
auto s = TryPopFromList();
- if (s.IsNotFound()) {
- // The connection may be waked up but can't pop from list. For example,
- // connection A is blocking on list and connection B push a new element
- // then wake up the connection A, but this element may be token by other
connection C.
- // So we need to wait for the wake event again by disabling the WRITE
event.
- bufferevent_disable(bev, EV_WRITE);
- return;
- }
-
- if (timer_) {
- timer_.reset();
- }
-
- unBlockingAll();
- conn_->SetCB(bev);
- bufferevent_enable(bev, EV_READ);
- // We need to manually trigger the read event since we will stop
processing commands
- // in connection after the blocking command, so there may have some
commands to be processed.
- // Related issue: https://github.com/apache/kvrocks/issues/831
- bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
+ return !s.IsNotFound();
}
- void OnEvent(bufferevent *bev, int16_t events) {
- if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
- if (timer_ != nullptr) {
- timer_.reset();
- }
- unBlockingAll();
- }
- conn_->OnEvent(bev, events);
- }
-
- void TimerCB(int, int16_t events) {
- conn_->Reply(redis::NilString());
- timer_.reset();
- unBlockingAll();
- auto bev = conn_->GetBufferEvent();
- conn_->SetCB(bev);
- bufferevent_enable(bev, EV_READ);
- }
+ std::string NoopReply() override { return redis::NilString(); }
private:
bool left_ = false;
int64_t timeout_ = 0; // microseconds
std::vector<std::string> keys_;
Server *svr_ = nullptr;
- Connection *conn_ = nullptr;
- UniqueEvent timer_;
-
- void unBlockingAll() {
- for (const auto &key : keys_) {
- svr_->UnblockOnKey(key, conn_);
- }
- }
};
class CommandBLPop : public CommandBPop {
@@ -632,9 +580,7 @@ class CommandLMove : public Commander {
bool dst_left_;
};
-class CommandBLMove : public Commander,
- private EvbufCallbackBase<CommandBLMove, false>,
- private EventCallbackBase<CommandBLMove> {
+class CommandBLMove : public BlockingCommander {
public:
Status Parse(const std::vector<std::string> &args) override {
auto arg_val = util::ToLower(args_[3]);
@@ -663,7 +609,7 @@ class CommandBLMove : public Commander,
Status Execute(Server *svr, Connection *conn, std::string *output) override {
svr_ = svr;
- conn_ = conn;
+ InitConnection(conn);
redis::List list_db(svr->storage, conn->GetNamespace());
std::string elem;
@@ -676,87 +622,37 @@ class CommandBLMove : public Commander,
return Status::OK();
}
- if (conn->IsInExec()) {
- *output = redis::MultiLen(-1);
- return Status::OK(); // no blocking in multi-exec
- }
-
- svr_->BlockOnKey(args_[1], conn_);
- auto bev = conn->GetBufferEvent();
- SetCB(bev);
+ return StartBlocking(timeout_, output);
+ }
- if (timeout_) {
- timer_.reset(NewTimer(bufferevent_get_base(bev)));
- int64_t timeout_second = timeout_ / 1000 / 1000;
- int64_t timeout_microsecond = timeout_ % (1000 * 1000);
- timeval tm = {timeout_second, static_cast<int>(timeout_microsecond)};
- evtimer_add(timer_.get(), &tm);
- }
+ void BlockKeys() override { svr_->BlockOnKey(args_[1], conn_); }
- return {Status::BlockingCmd};
- }
+ void UnblockKeys() override { svr_->UnblockOnKey(args_[1], conn_); }
- void OnWrite(bufferevent *bev) {
+ bool OnBlockingWrite() override {
redis::List list_db(svr_->storage, conn_->GetNamespace());
std::string elem;
auto s = list_db.LMove(args_[1], args_[2], src_left_, dst_left_, &elem);
if (!s.ok() && !s.IsNotFound()) {
conn_->Reply(redis::Error("ERR " + s.ToString()));
- return;
+ return true;
}
- if (elem.empty()) {
- // The connection may be waked up but can't pop from a zset. For
example, connection A is blocked on zset and
- // connection B added a new element; then connection A was unblocked,
but this element may be taken by
- // another connection C. So we need to block connection A again and wait
for the element being added
- // by disabling the WRITE event.
- bufferevent_disable(bev, EV_WRITE);
- return;
+ bool empty = elem.empty();
+ if (!empty) {
+ conn_->Reply(redis::BulkString(elem));
}
- conn_->Reply(redis::BulkString(elem));
-
- if (timer_) {
- timer_.reset();
- }
-
- unblockOnSrc();
- conn_->SetCB(bev);
- bufferevent_enable(bev, EV_READ);
- // We need to manually trigger the read event since we will stop
processing commands
- // in connection after the blocking command, so there may have some
commands to be processed.
- // Related issue: https://github.com/apache/kvrocks/issues/831
- bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
+ return !empty;
}
- void OnEvent(bufferevent *bev, int16_t events) {
- if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
- if (timer_ != nullptr) {
- timer_.reset();
- }
- unblockOnSrc();
- }
- conn_->OnEvent(bev, events);
- }
-
- void TimerCB(int, int16_t) {
- conn_->Reply(redis::MultiLen(-1));
- timer_.reset();
- unblockOnSrc();
- auto bev = conn_->GetBufferEvent();
- conn_->SetCB(bev);
- bufferevent_enable(bev, EV_READ);
- }
+ std::string NoopReply() override { return redis::MultiLen(-1); }
private:
bool src_left_;
bool dst_left_;
int64_t timeout_ = 0; // microseconds
Server *svr_ = nullptr;
- Connection *conn_ = nullptr;
- UniqueEvent timer_;
-
- void unblockOnSrc() { svr_->UnblockOnKey(args_[1], conn_); }
};
class CommandLPos : public Commander {
diff --git a/src/commands/cmd_zset.cc b/src/commands/cmd_zset.cc
index 52c9a3d4..41493b6e 100644
--- a/src/commands/cmd_zset.cc
+++ b/src/commands/cmd_zset.cc
@@ -22,6 +22,7 @@
#include "command_parser.h"
#include "commander.h"
+#include "commands/blocking_commander.h"
#include "commands/scan_base.h"
#include "error_constants.h"
#include "server/redis_reply.h"
@@ -293,9 +294,7 @@ static rocksdb::Status PopFromMultipleZsets(redis::ZSet
*zset_db, const std::vec
return rocksdb::Status::OK();
}
-class CommandBZPop : public Commander,
- private EvbufCallbackBase<CommandBZPop, false>,
- private EventCallbackBase<CommandBZPop> {
+class CommandBZPop : public BlockingCommander {
public:
explicit CommandBZPop(bool min) : min_(min) {}
@@ -315,7 +314,7 @@ class CommandBZPop : public Commander,
Status Execute(Server *svr, Connection *conn, std::string *output) override {
svr_ = svr;
- conn_ = conn;
+ InitConnection(conn);
std::string user_key;
std::vector<MemberScore> member_scores;
@@ -331,28 +330,21 @@ class CommandBZPop : public Commander,
return Status::OK();
}
- // all sorted sets are empty
- if (conn->IsInExec()) {
- *output = redis::MultiLen(-1);
- return Status::OK(); // no blocking in multi-exec
- }
+ return StartBlocking(timeout_, output);
+ }
+
+ std::string NoopReply() override { return redis::MultiLen(-1); }
+ void BlockKeys() override {
for (const auto &key : keys_) {
svr_->BlockOnKey(key, conn_);
}
+ }
- auto bev = conn->GetBufferEvent();
- SetCB(bev);
-
- if (timeout_) {
- timer_.reset(NewTimer(bufferevent_get_base(bev)));
- int64_t timeout_second = timeout_ / 1000 / 1000;
- int64_t timeout_microsecond = timeout_ % (1000 * 1000);
- timeval tm = {timeout_second, static_cast<int>(timeout_microsecond)};
- evtimer_add(timer_.get(), &tm);
+ void UnblockKeys() override {
+ for (const auto &key : keys_) {
+ svr_->UnblockOnKey(key, conn_);
}
-
- return {Status::BlockingCmd};
}
void SendMembersWithScores(const std::vector<MemberScore> &member_scores,
const std::string &user_key) {
@@ -366,7 +358,7 @@ class CommandBZPop : public Commander,
conn_->Reply(output);
}
- void OnWrite(bufferevent *bev) {
+ bool OnBlockingWrite() override {
std::string user_key;
std::vector<MemberScore> member_scores;
@@ -374,50 +366,15 @@ class CommandBZPop : public Commander,
auto s = PopFromMultipleZsets(&zset_db, keys_, min_, 1, &user_key,
&member_scores);
if (!s.ok()) {
conn_->Reply(redis::Error("ERR " + s.ToString()));
- return;
- }
-
- if (member_scores.empty()) {
- // The connection may be waked up but can't pop from a zset. For
example, connection A is blocked on zset and
- // connection B added a new element; then connection A was unblocked,
but this element may be taken by
- // another connection C. So we need to block connection A again and wait
for the element being added
- // by disabling the WRITE event.
- bufferevent_disable(bev, EV_WRITE);
- return;
+ return true;
}
- SendMembersWithScores(member_scores, user_key);
-
- if (timer_) {
- timer_.reset();
- }
-
- unblockOnAllKeys();
- conn_->SetCB(bev);
- bufferevent_enable(bev, EV_READ);
- // We need to manually trigger the read event since we will stop
processing commands
- // in connection after the blocking command, so there may have some
commands to be processed.
- // Related issue: https://github.com/apache/kvrocks/issues/831
- bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
- }
-
- void OnEvent(bufferevent *bev, int16_t events) {
- if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
- if (timer_ != nullptr) {
- timer_.reset();
- }
- unblockOnAllKeys();
+ bool empty = member_scores.empty();
+ if (!empty) {
+ SendMembersWithScores(member_scores, user_key);
}
- conn_->OnEvent(bev, events);
- }
- void TimerCB(int, int16_t) {
- conn_->Reply(redis::MultiLen(-1));
- timer_.reset();
- unblockOnAllKeys();
- auto bev = conn_->GetBufferEvent();
- conn_->SetCB(bev);
- bufferevent_enable(bev, EV_READ);
+ return !empty;
}
private:
@@ -425,14 +382,6 @@ class CommandBZPop : public Commander,
int64_t timeout_ = 0; // microseconds
std::vector<std::string> keys_;
Server *svr_ = nullptr;
- Connection *conn_ = nullptr;
- UniqueEvent timer_;
-
- void unblockOnAllKeys() {
- for (const auto &key : keys_) {
- svr_->UnblockOnKey(key, conn_);
- }
- }
};
class CommandBZPopMin : public CommandBZPop {
@@ -518,9 +467,7 @@ class CommandZMPop : public Commander {
int count_ = 0;
};
-class CommandBZMPop : public Commander,
- private EvbufCallbackBase<CommandBZMPop, false>,
- private EventCallbackBase<CommandBZMPop> {
+class CommandBZMPop : public BlockingCommander {
public:
Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 1);
@@ -557,7 +504,7 @@ class CommandBZMPop : public Commander,
Status Execute(Server *svr, Connection *conn, std::string *output) override {
svr_ = svr;
- conn_ = conn;
+ InitConnection(conn);
std::string user_key;
std::vector<MemberScore> member_scores;
@@ -573,31 +520,24 @@ class CommandBZMPop : public Commander,
return Status::OK();
}
- // all sorted sets are empty
- if (conn->IsInExec()) {
- *output = redis::MultiLen(-1);
- return Status::OK(); // no blocking in multi-exec
- }
+ return StartBlocking(timeout_, output);
+ }
+ void BlockKeys() override {
for (const auto &key : keys_) {
svr_->BlockOnKey(key, conn_);
}
+ }
- auto bev = conn->GetBufferEvent();
- SetCB(bev);
-
- if (timeout_) {
- timer_.reset(NewTimer(bufferevent_get_base(bev)));
- int64_t timeout_second = timeout_ / 1000 / 1000;
- int64_t timeout_microsecond = timeout_ % (1000 * 1000);
- timeval tm = {timeout_second, static_cast<int>(timeout_microsecond)};
- evtimer_add(timer_.get(), &tm);
+ void UnblockKeys() override {
+ for (const auto &key : keys_) {
+ svr_->UnblockOnKey(key, conn_);
}
-
- return {Status::BlockingCmd};
}
- void OnWrite(bufferevent *bev) {
+ std::string NoopReply() override { return redis::NilString(); }
+
+ bool OnBlockingWrite() override {
std::string user_key;
std::vector<MemberScore> member_scores;
@@ -605,50 +545,15 @@ class CommandBZMPop : public Commander,
auto s = PopFromMultipleZsets(&zset_db, keys_, flag_ == ZSET_MIN, count_,
&user_key, &member_scores);
if (!s.ok()) {
conn_->Reply(redis::Error("ERR " + s.ToString()));
- return;
- }
-
- if (member_scores.empty()) {
- // The connection may be waked up but can't pop from a zset. For
example, connection A is blocked on zset and
- // connection B added a new element; then connection A was unblocked,
but this element may be taken by
- // another connection C. So we need to block connection A again and wait
for the element being added
- // by disabling the WRITE event.
- bufferevent_disable(bev, EV_WRITE);
- return;
+ return true;
}
- SendMembersWithScoresForZMpop(conn_, user_key, member_scores);
-
- if (timer_) {
- timer_.reset();
- }
-
- unblockOnAllKeys();
- conn_->SetCB(bev);
- bufferevent_enable(bev, EV_READ);
- // We need to manually trigger the read event since we will stop
processing commands
- // in connection after the blocking command, so there may have some
commands to be processed.
- // Related issue: https://github.com/apache/kvrocks/issues/831
- bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
- }
-
- void OnEvent(bufferevent *bev, int16_t events) {
- if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
- if (timer_ != nullptr) {
- timer_.reset();
- }
- unblockOnAllKeys();
+ bool empty = member_scores.empty();
+ if (!empty) {
+ SendMembersWithScoresForZMpop(conn_, user_key, member_scores);
}
- conn_->OnEvent(bev, events);
- }
- void TimerCB(int, int16_t events) {
- conn_->Reply(redis::NilString());
- timer_.reset();
- unblockOnAllKeys();
- auto bev = conn_->GetBufferEvent();
- conn_->SetCB(bev);
- bufferevent_enable(bev, EV_READ);
+ return !empty;
}
static CommandKeyRange Range(const std::vector<std::string> &args) {
@@ -663,14 +568,6 @@ class CommandBZMPop : public Commander,
enum { ZSET_MIN, ZSET_MAX, ZSET_NONE } flag_ = ZSET_NONE;
int count_ = 0;
Server *svr_ = nullptr;
- Connection *conn_ = nullptr;
- UniqueEvent timer_;
-
- void unblockOnAllKeys() {
- for (const auto &key : keys_) {
- svr_->UnblockOnKey(key, conn_);
- }
- }
};
class CommandZRangeStore : public Commander {