torwig commented on code in PR #1490:
URL:
https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1226502426
##########
src/commands/cmd_zset.cc:
##########
@@ -301,40 +450,149 @@ class CommandZMPop : public Commander {
}
Status Execute(Server *svr, Connection *conn, std::string *output) override {
- redis::ZSet zset_db(svr->storage, conn->GetNamespace());
+ svr_ = svr;
+ conn_ = conn;
+
+ if (!block_) {
+ auto s = TryPopFromZset();
+ return Status::OK();
+ }
+
+ auto bev = conn->GetBufferEvent();
+
+ auto s = TryPopFromZset();
+
+ if (!s.ok() || reply_flag_) {
+ return Status::OK(); // error has already output or result has already
output
+ }
+
+ // All Empty
+ if (conn->IsInExec()) {
+ *output = redis::MultiLen(-1);
+ return Status::OK(); // No blocking in multi-exec
+ }
+
+ for (const auto &key : keys_) {
+ svr_->BlockOnKey(key, conn_);
+ }
+
+ SetCB(bev);
+
+ if (timeout_) {
+ timer_.reset(NewTimer(bufferevent_get_base(bev)));
+ timeval tm = {timeout_, 0};
+ evtimer_add(timer_.get(), &tm);
+ }
+
+ return {Status::BlockingCmd};
+ }
+
+ rocksdb::Status TryPopFromZset() {
+ redis::ZSet zset_db(svr_->storage, conn_->GetNamespace());
+ rocksdb::Status s;
+ std::string output;
for (auto &user_key : keys_) {
std::vector<MemberScore> member_scores;
- auto s = zset_db.Pop(user_key, count_, flag_ == ZSET_MIN,
&member_scores);
+ s = zset_db.Pop(user_key, count_, flag_ == ZSET_MIN, &member_scores);
if (!s.ok()) {
- return {Status::RedisExecErr, s.ToString()};
+ conn_->Reply(redis::Error("ERR " + s.ToString()));
+ break;
}
if (member_scores.empty()) {
continue;
}
-
- output->append(redis::MultiLen(2));
- output->append(redis::BulkString(user_key));
- output->append(redis::MultiLen(member_scores.size() * 2));
+ output.append(redis::MultiLen(2));
+ output.append(redis::BulkString(user_key));
+ output.append(redis::MultiLen(member_scores.size() * 2));
for (const auto &ms : member_scores) {
- output->append(redis::BulkString(ms.member));
- output->append(redis::BulkString(util::Float2String(ms.score)));
+ output.append(redis::BulkString(ms.member));
+ output.append(redis::BulkString(util::Float2String(ms.score)));
}
- return Status::OK();
+ reply_flag_ = true;
+ break;
}
- *output = redis::NilString();
- return Status::OK();
+ if (output.empty() && !block_) {
+ output = redis::NilString();
+ }
+ conn_->Reply(output);
+ return s;
+ }
+
+ void OnWrite(bufferevent *bev) {
+ auto s = TryPopFromZset();
+ if (!s.ok() || !reply_flag_) {
Review Comment:
In the case of an error inside `TryPopFromZset`, the function by itself will
send an error and then the connection allegedly will stay blocked. Am I right?
##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,211 @@ class CommandZLexCount : public Commander {
RangeLexSpec spec_;
};
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+ private EvbufCallbackBase<CommandZPop, false>,
+ private EventCallbackBase<CommandZPop> {
public:
- explicit CommandZPop(bool min) : min_(min) {}
+ explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
Status Parse(const std::vector<std::string> &args) override {
- if (args.size() > 3) {
- return {Status::RedisParseErr, errWrongNumOfArguments};
- }
+ if (!block_) {
+ if (args.size() > 3) {
+ return {Status::RedisParseErr, errWrongNumOfArguments};
+ }
+
+ if (args.size() == 3) {
+ auto parse_result = ParseInt<int>(args[2], 10);
+ if (!parse_result) {
+ return {Status::RedisParseErr, errValueNotInteger};
+ }
+
+ count_ = *parse_result;
+ }
+ keys_.push_back(args[1]);
+ return Commander::Parse(args);
+ } else {
+ auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
- if (args.size() == 3) {
- auto parse_result = ParseInt<int>(args[2], 10);
if (!parse_result) {
- return {Status::RedisParseErr, errValueNotInteger};
+ return {Status::RedisParseErr, "timeout is not an integer or out of
range"};
+ }
+
+ if (*parse_result < 0) {
+ return {Status::RedisParseErr, "timeout should not be negative"};
}
- count_ = *parse_result;
+ timeout_ = *parse_result;
+
+ keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
+ return Commander::Parse(args);
}
- return Commander::Parse(args);
}
Status Execute(Server *svr, Connection *conn, std::string *output) override {
- redis::ZSet zset_db(svr->storage, conn->GetNamespace());
- std::vector<MemberScore> member_scores;
- auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
- if (!s.ok()) {
- return {Status::RedisExecErr, s.ToString()};
+ svr_ = svr;
+ conn_ = conn;
+
+ if (!block_) {
+ auto s = TryPopFromZset();
+ return Status::OK();
}
- output->append(redis::MultiLen(member_scores.size() * 2));
- for (const auto &ms : member_scores) {
- output->append(redis::BulkString(ms.member));
- output->append(redis::BulkString(util::Float2String(ms.score)));
+ auto bev = conn->GetBufferEvent();
+
+ auto s = TryPopFromZset();
+
+ if (!s.ok() || reply_flag_) {
+ return Status::OK(); // error has already output or result has already
output
}
- return Status::OK();
+ // All Empty
+ if (conn->IsInExec()) {
+ *output = redis::MultiLen(-1);
+ return Status::OK(); // No blocking in multi-exec
+ }
+
+ for (const auto &key : keys_) {
+ svr_->BlockOnKey(key, conn_);
+ }
+
+ SetCB(bev);
+
+ if (timeout_) {
+ timer_.reset(NewTimer(bufferevent_get_base(bev)));
+ timeval tm = {timeout_, 0};
+ evtimer_add(timer_.get(), &tm);
+ }
+
+ return {Status::BlockingCmd};
+ }
+
+ rocksdb::Status TryPopFromZset() {
Review Comment:
I saw this function twice in this file. Perhaps, something could be done to
remove/minimize duplication.
##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,211 @@ class CommandZLexCount : public Commander {
RangeLexSpec spec_;
};
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+ private EvbufCallbackBase<CommandZPop, false>,
+ private EventCallbackBase<CommandZPop> {
public:
- explicit CommandZPop(bool min) : min_(min) {}
+ explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
Status Parse(const std::vector<std::string> &args) override {
- if (args.size() > 3) {
- return {Status::RedisParseErr, errWrongNumOfArguments};
- }
+ if (!block_) {
+ if (args.size() > 3) {
+ return {Status::RedisParseErr, errWrongNumOfArguments};
+ }
+
+ if (args.size() == 3) {
+ auto parse_result = ParseInt<int>(args[2], 10);
+ if (!parse_result) {
+ return {Status::RedisParseErr, errValueNotInteger};
+ }
+
+ count_ = *parse_result;
+ }
+ keys_.push_back(args[1]);
+ return Commander::Parse(args);
+ } else {
Review Comment:
Here you can write without `else` since you have `return` in the `if` part -
it saves some indentation.
##########
src/commands/cmd_zset.cc:
##########
@@ -301,40 +450,149 @@ class CommandZMPop : public Commander {
}
Status Execute(Server *svr, Connection *conn, std::string *output) override {
- redis::ZSet zset_db(svr->storage, conn->GetNamespace());
+ svr_ = svr;
+ conn_ = conn;
+
+ if (!block_) {
+ auto s = TryPopFromZset();
+ return Status::OK();
+ }
+
+ auto bev = conn->GetBufferEvent();
+
+ auto s = TryPopFromZset();
+
+ if (!s.ok() || reply_flag_) {
+ return Status::OK(); // error has already output or result has already
output
+ }
+
+ // All Empty
+ if (conn->IsInExec()) {
+ *output = redis::MultiLen(-1);
+ return Status::OK(); // No blocking in multi-exec
+ }
+
+ for (const auto &key : keys_) {
+ svr_->BlockOnKey(key, conn_);
+ }
+
+ SetCB(bev);
+
+ if (timeout_) {
+ timer_.reset(NewTimer(bufferevent_get_base(bev)));
+ timeval tm = {timeout_, 0};
+ evtimer_add(timer_.get(), &tm);
+ }
+
+ return {Status::BlockingCmd};
+ }
+
+ rocksdb::Status TryPopFromZset() {
Review Comment:
Actually, the function pops from multiple sorted sets which is not expressed
by its name. And, except for popping elements, it sends a reply. It's better to
separate concerns: pop and return elements in one function and then analyze (if
something was popped/if the mode is blocking/etc) and send the response in
another function. Let the function do just one thing.
##########
src/commands/cmd_zset.cc:
##########
@@ -301,40 +450,149 @@ class CommandZMPop : public Commander {
}
Status Execute(Server *svr, Connection *conn, std::string *output) override {
- redis::ZSet zset_db(svr->storage, conn->GetNamespace());
+ svr_ = svr;
+ conn_ = conn;
+
+ if (!block_) {
+ auto s = TryPopFromZset();
+ return Status::OK();
+ }
+
+ auto bev = conn->GetBufferEvent();
Review Comment:
You can move this line closer to the place where `bev` is used, e.g. before
the `SetCB(bev);` line.
##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,211 @@ class CommandZLexCount : public Commander {
RangeLexSpec spec_;
};
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+ private EvbufCallbackBase<CommandZPop, false>,
+ private EventCallbackBase<CommandZPop> {
public:
- explicit CommandZPop(bool min) : min_(min) {}
+ explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
Status Parse(const std::vector<std::string> &args) override {
- if (args.size() > 3) {
- return {Status::RedisParseErr, errWrongNumOfArguments};
- }
+ if (!block_) {
+ if (args.size() > 3) {
+ return {Status::RedisParseErr, errWrongNumOfArguments};
+ }
+
+ if (args.size() == 3) {
+ auto parse_result = ParseInt<int>(args[2], 10);
+ if (!parse_result) {
+ return {Status::RedisParseErr, errValueNotInteger};
+ }
+
+ count_ = *parse_result;
+ }
+ keys_.push_back(args[1]);
+ return Commander::Parse(args);
+ } else {
+ auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
- if (args.size() == 3) {
- auto parse_result = ParseInt<int>(args[2], 10);
if (!parse_result) {
- return {Status::RedisParseErr, errValueNotInteger};
+ return {Status::RedisParseErr, "timeout is not an integer or out of
range"};
+ }
+
+ if (*parse_result < 0) {
+ return {Status::RedisParseErr, "timeout should not be negative"};
}
- count_ = *parse_result;
+ timeout_ = *parse_result;
+
+ keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
+ return Commander::Parse(args);
}
- return Commander::Parse(args);
}
Status Execute(Server *svr, Connection *conn, std::string *output) override {
- redis::ZSet zset_db(svr->storage, conn->GetNamespace());
- std::vector<MemberScore> member_scores;
- auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
- if (!s.ok()) {
- return {Status::RedisExecErr, s.ToString()};
+ svr_ = svr;
+ conn_ = conn;
+
+ if (!block_) {
+ auto s = TryPopFromZset();
Review Comment:
If `!block_` you do `TryPopFromZset` and if `block_` actually you do
`TryPopFromZset` as well (identical actions inside the `if` and the `else`
branches).
I'm sure you will refactor this after you refactor/split the
`TryPopFromZset` function.
##########
src/commands/cmd_zset.cc:
##########
@@ -301,40 +450,149 @@ class CommandZMPop : public Commander {
}
Status Execute(Server *svr, Connection *conn, std::string *output) override {
- redis::ZSet zset_db(svr->storage, conn->GetNamespace());
+ svr_ = svr;
+ conn_ = conn;
+
+ if (!block_) {
+ auto s = TryPopFromZset();
+ return Status::OK();
+ }
+
+ auto bev = conn->GetBufferEvent();
+
+ auto s = TryPopFromZset();
+
+ if (!s.ok() || reply_flag_) {
+ return Status::OK(); // error has already output or result has already
output
+ }
+
+ // All Empty
+ if (conn->IsInExec()) {
+ *output = redis::MultiLen(-1);
+ return Status::OK(); // No blocking in multi-exec
+ }
+
+ for (const auto &key : keys_) {
+ svr_->BlockOnKey(key, conn_);
+ }
+
+ SetCB(bev);
+
+ if (timeout_) {
+ timer_.reset(NewTimer(bufferevent_get_base(bev)));
+ timeval tm = {timeout_, 0};
+ evtimer_add(timer_.get(), &tm);
+ }
+
+ return {Status::BlockingCmd};
+ }
+
+ rocksdb::Status TryPopFromZset() {
+ redis::ZSet zset_db(svr_->storage, conn_->GetNamespace());
+ rocksdb::Status s;
+ std::string output;
for (auto &user_key : keys_) {
std::vector<MemberScore> member_scores;
- auto s = zset_db.Pop(user_key, count_, flag_ == ZSET_MIN,
&member_scores);
+ s = zset_db.Pop(user_key, count_, flag_ == ZSET_MIN, &member_scores);
if (!s.ok()) {
- return {Status::RedisExecErr, s.ToString()};
+ conn_->Reply(redis::Error("ERR " + s.ToString()));
Review Comment:
In case of an error inside `Pop` you will reply with the error message and
then just `break` which only interrupts the `for` loop. And later there is a
chance that you will reply with a nil-string or some data that will be in the
`output`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]