torwig commented on code in PR #1490:
URL: 
https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1230974595


##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,218 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
-      if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        count_ = *parse_result;
       }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    }
+    auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-      count_ = *parse_result;
+    if (!parse_result) {
+      return {Status::RedisParseErr, "timeout is not an integer or out of 
range"};
     }
+
+    if (*parse_result < 0) {
+      return {Status::RedisParseErr, errTimeoutIsNegative};
+    }
+
+    timeout_ = *parse_result;
+
+    keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
     return Commander::Parse(args);
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
-    std::vector<MemberScore> member_scores;
-    auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
+    svr_ = svr;
+    conn_ = conn;
+
+    auto s = TryPopFromMultiZset();

Review Comment:
   Could you simplify the logic like the following:
   
   ```
   redis::ZSet zset_db(svr_->storage, conn_->GetNamespace());
   
   for (auto &key : keys_) {
     std::vector<MemberScore> member_scores;
     auto s = zset_db.Pop(key, count_, flag_ == ZSET_MIN, &member_scores);
     if (!s.ok()) {
       return {Status::RedisExecErr, s.ToString()};
     }
   
    if (!member_scores.empty()) {
       ReplyWithMemberScores(member_scores); // construct output and send it to 
conn_
       return Status::OK();
     }
   }
   
   // no elements found in all keys
   if (!block_ || conn->IsInExec()) {
       // reply with nil array
       return Status::OK();
   }
   
   for (const auto &key : keys_) {
     svr_->BlockOnKey(key, conn_);
   }
       
    // the rest of the `Execute` code with `bev` and setting a timer.
   ```



##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,218 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}

Review Comment:
   BTW, did you add also `BZPOPMAX` and `BZPOPMIN`?



##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,218 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
-      if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        count_ = *parse_result;
       }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    }
+    auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-      count_ = *parse_result;
+    if (!parse_result) {
+      return {Status::RedisParseErr, "timeout is not an integer or out of 
range"};
     }
+
+    if (*parse_result < 0) {
+      return {Status::RedisParseErr, errTimeoutIsNegative};
+    }
+
+    timeout_ = *parse_result;
+
+    keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
     return Commander::Parse(args);
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
-    std::vector<MemberScore> member_scores;
-    auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
+    svr_ = svr;
+    conn_ = conn;
+
+    auto s = TryPopFromMultiZset();
     if (!s.ok()) {
-      return {Status::RedisExecErr, s.ToString()};
+      return Status::OK();  // Error already output
     }
 
-    output->append(redis::MultiLen(member_scores.size() * 2));
-    for (const auto &ms : member_scores) {
-      output->append(redis::BulkString(ms.member));
-      output->append(redis::BulkString(util::Float2String(ms.score)));
+    if (!block_) {  // ReplyForZPop
+      output->append(redis::MultiLen(member_scores_.size() * 2));
+      for (const auto &ms : member_scores_) {
+        output->append(redis::BulkString(ms.member));
+        output->append(redis::BulkString(util::Float2String(ms.score)));
+      }
+      return Status::OK();
     }
 
-    return Status::OK();
+    if (!member_scores_.empty()) {
+      ReplyForBZPop();
+      return Status::OK();
+    }
+
+    // All Empty
+    if (conn->IsInExec()) {
+      *output = redis::MultiLen(-1);
+      return Status::OK();  // No blocking in multi-exec
+    }
+
+    for (const auto &key : keys_) {
+      svr_->BlockOnKey(key, conn_);
+    }
+
+    auto bev = conn->GetBufferEvent();
+    SetCB(bev);
+
+    if (timeout_) {
+      timer_.reset(NewTimer(bufferevent_get_base(bev)));
+      timeval tm = {timeout_, 0};
+      evtimer_add(timer_.get(), &tm);
+    }
+
+    return {Status::BlockingCmd};
+  }
+
+  rocksdb::Status TryPopFromMultiZset() {
+    redis::ZSet zset_db(svr_->storage, conn_->GetNamespace());
+    rocksdb::Status s;
+    for (auto &user_key : keys_) {
+      s = zset_db.Pop(user_key, count_, min_, &member_scores_);
+      if (!s.ok()) {
+        // output here is necessary for block operation to reply error
+        conn_->Reply(redis::Error("ERR " + s.ToString()));
+        break;
+      }
+      if (member_scores_.empty() && block_) {
+        continue;
+      }
+      user_key_ = user_key;
+      break;
+    }
+    return s;
+  }
+
+  void ReplyForBZPop() {
+    std::string output;
+    output.append(redis::MultiLen(member_scores_.size() * 2 + 1));
+    output.append(redis::BulkString(user_key_));
+    for (const auto &ms : member_scores_) {
+      output.append(redis::BulkString(ms.member));
+      output.append(redis::BulkString(util::Float2String(ms.score)));
+    }
+    conn_->Reply(output);
+  }
+
+  void OnWrite(bufferevent *bev) {
+    auto s = TryPopFromMultiZset();
+    if (member_scores_.empty()) {
+      // The connection may be waked up but can't pop from list. For example,
+      // connection A is blocking on list and connection B push a new element
+      // then wake up the connection A, but this element may be token by other 
connection C.
+      // So we need to wait for the wake event again by disabling the WRITE 
event.
+      bufferevent_disable(bev, EV_WRITE);
+      return;
+    }
+    ReplyForBZPop();
+
+    if (timer_) {
+      timer_.reset();
+    }
+
+    unBlockingAll();
+    conn_->SetCB(bev);
+    bufferevent_enable(bev, EV_READ);
+    // We need to manually trigger the read event since we will stop 
processing commands
+    // in connection after the blocking command, so there may have some 
commands to be processed.
+    // Related issue: https://github.com/apache/incubator-kvrocks/issues/831
+    bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
+  }
+
+  void OnEvent(bufferevent *bev, int16_t events) {
+    if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
+      if (timer_ != nullptr) {
+        timer_.reset();
+      }
+      unBlockingAll();
+    }
+    conn_->OnEvent(bev, events);
+  }
+
+  void TimerCB(int, int16_t events) {
+    conn_->Reply(redis::NilString());
+    timer_.reset();
+    unBlockingAll();
+    auto bev = conn_->GetBufferEvent();
+    conn_->SetCB(bev);
+    bufferevent_enable(bev, EV_READ);
   }
 
  private:
   bool min_;
+  bool block_;
   int count_ = 1;
+  int timeout_;
+  std::vector<std::string> keys_;
+  Server *svr_ = nullptr;
+  Connection *conn_ = nullptr;
+  UniqueEvent timer_;
+  std::string user_key_;

Review Comment:
   And you can delete `user_key_` and `member_scores_` as the command class 
members.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to