torwig commented on code in PR #1490:
URL: 
https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1229777929


##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,211 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
+
+        count_ = *parse_result;
+      }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    } else {
+      auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
       if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        return {Status::RedisParseErr, "timeout is not an integer or out of 
range"};
+      }
+
+      if (*parse_result < 0) {
+        return {Status::RedisParseErr, "timeout should not be negative"};
       }
 
-      count_ = *parse_result;
+      timeout_ = *parse_result;
+
+      keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
+      return Commander::Parse(args);
     }
-    return Commander::Parse(args);
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
-    std::vector<MemberScore> member_scores;
-    auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
-    if (!s.ok()) {
-      return {Status::RedisExecErr, s.ToString()};
+    svr_ = svr;
+    conn_ = conn;
+
+    if (!block_) {
+      auto s = TryPopFromZset();
+      return Status::OK();
     }
 
-    output->append(redis::MultiLen(member_scores.size() * 2));
-    for (const auto &ms : member_scores) {
-      output->append(redis::BulkString(ms.member));
-      output->append(redis::BulkString(util::Float2String(ms.score)));
+    auto bev = conn->GetBufferEvent();
+
+    auto s = TryPopFromZset();
+
+    if (!s.ok() || reply_flag_) {
+      return Status::OK();  // error has already output or result has already 
output
     }
 
-    return Status::OK();
+    // All Empty
+    if (conn->IsInExec()) {
+      *output = redis::MultiLen(-1);
+      return Status::OK();  // No blocking in multi-exec
+    }
+
+    for (const auto &key : keys_) {
+      svr_->BlockOnKey(key, conn_);
+    }
+
+    SetCB(bev);
+
+    if (timeout_) {
+      timer_.reset(NewTimer(bufferevent_get_base(bev)));
+      timeval tm = {timeout_, 0};
+      evtimer_add(timer_.get(), &tm);
+    }
+
+    return {Status::BlockingCmd};
+  }
+
+  rocksdb::Status TryPopFromZset() {

Review Comment:
   @Yangsx-1 Thank you for the changes you've made. I'll review them later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to