Yangsx-1 commented on code in PR #1490:
URL: 
https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1226788186


##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,211 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
+
+        count_ = *parse_result;
+      }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    } else {
+      auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
       if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        return {Status::RedisParseErr, "timeout is not an integer or out of 
range"};
+      }
+
+      if (*parse_result < 0) {
+        return {Status::RedisParseErr, "timeout should not be negative"};
       }
 
-      count_ = *parse_result;
+      timeout_ = *parse_result;
+
+      keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
+      return Commander::Parse(args);
     }
-    return Commander::Parse(args);
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
-    std::vector<MemberScore> member_scores;
-    auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
-    if (!s.ok()) {
-      return {Status::RedisExecErr, s.ToString()};
+    svr_ = svr;
+    conn_ = conn;
+
+    if (!block_) {
+      auto s = TryPopFromZset();
+      return Status::OK();
     }
 
-    output->append(redis::MultiLen(member_scores.size() * 2));
-    for (const auto &ms : member_scores) {
-      output->append(redis::BulkString(ms.member));
-      output->append(redis::BulkString(util::Float2String(ms.score)));
+    auto bev = conn->GetBufferEvent();
+
+    auto s = TryPopFromZset();
+
+    if (!s.ok() || reply_flag_) {
+      return Status::OK();  // error has already output or result has already 
output
     }
 
-    return Status::OK();
+    // All Empty
+    if (conn->IsInExec()) {
+      *output = redis::MultiLen(-1);
+      return Status::OK();  // No blocking in multi-exec
+    }
+
+    for (const auto &key : keys_) {
+      svr_->BlockOnKey(key, conn_);
+    }
+
+    SetCB(bev);
+
+    if (timeout_) {
+      timer_.reset(NewTimer(bufferevent_get_base(bev)));
+      timeval tm = {timeout_, 0};
+      evtimer_add(timer_.get(), &tm);
+    }
+
+    return {Status::BlockingCmd};
+  }
+
+  rocksdb::Status TryPopFromZset() {

Review Comment:
   I can split the func `TryPopFromZset`, but it is a little difficult or 
tedious to merge `BZMPOP` and `BZPOPMIN/MAX` together. The ways they parse 
arguments and reply messages are so different. The only same thing for them in 
the func `TryPopFromZset` is they pop from the database, although the name are 
same, maybe it's a bad name. :(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to