This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 711e41b6 Add the support of the BLMPOP command (#1774)
711e41b6 is described below

commit 711e41b688e90c3b405207f897b49249057650bd
Author: HolyLow <[email protected]>
AuthorDate: Fri Sep 22 20:46:02 2023 +0800

    Add the support of the BLMPOP command (#1774)
---
 src/commands/cmd_list.cc                 | 124 +++++++++++++
 tests/gocase/unit/type/list/list_test.go | 305 +++++++++++++++++++++++++++++++
 tests/gocase/util/tcp_client.go          |  15 ++
 3 files changed, 444 insertions(+)

diff --git a/src/commands/cmd_list.cc b/src/commands/cmd_list.cc
index ad5a50db..a9612351 100644
--- a/src/commands/cmd_list.cc
+++ b/src/commands/cmd_list.cc
@@ -334,6 +334,129 @@ class CommandBRPop : public CommandBPop {
   CommandBRPop() : CommandBPop(false) {}
 };
 
+class CommandBLMPop : public BlockingCommander {
+ public:
+  CommandBLMPop() = default;
+  CommandBLMPop(const CommandBLMPop &) = delete;
+  CommandBLMPop &operator=(const CommandBLMPop &) = delete;
+
+  ~CommandBLMPop() override = default;
+
+  // format: BLMPOP timeout #numkeys key0 [key1 ...] <LEFT | RIGHT> [COUNT 
count]
+  Status Parse(const std::vector<std::string> &args) override {
+    CommandParser parser(args, 1);
+
+    auto timeout = GET_OR_RET(parser.TakeFloat());
+    timeout_ = static_cast<int64_t>(timeout * 1000 * 1000);
+
+    auto num_keys = GET_OR_RET(parser.TakeInt<uint32_t>());
+    keys_.clear();
+    keys_.reserve(num_keys);
+    for (uint32_t i = 0; i < num_keys; ++i) {
+      keys_.emplace_back(GET_OR_RET(parser.TakeStr()));
+    }
+
+    auto left_or_right = util::ToLower(GET_OR_RET(parser.TakeStr()));
+    if (left_or_right == "left") {
+      left_ = true;
+    } else if (left_or_right == "right") {
+      left_ = false;
+    } else {
+      return {Status::RedisParseErr, errInvalidSyntax};
+    }
+
+    while (parser.Good()) {
+      if (parser.EatEqICase("count") && count_ == static_cast<uint32_t>(-1)) {
+        count_ = GET_OR_RET(parser.TakeInt<uint32_t>());
+      } else {
+        return parser.InvalidSyntax();
+      }
+    }
+    if (count_ == static_cast<uint32_t>(-1)) {
+      count_ = 1;
+    }
+
+    return Status::OK();
+  }
+
+  Status Execute(Server *svr, Connection *conn, std::string *output) override {
+    svr_ = svr;
+    InitConnection(conn);
+
+    auto s = ExecuteUnblocked();
+    if (s.ok() || !s.IsNotFound()) {
+      return Status::OK();  // error has already been output
+    }
+
+    return StartBlocking(timeout_, output);
+  }
+
+  rocksdb::Status ExecuteUnblocked() {
+    redis::List list_db(svr_->storage, conn_->GetNamespace());
+    std::vector<std::string> elems;
+    std::string chosen_key;
+    rocksdb::Status s;
+    for (const auto &key : keys_) {
+      s = list_db.PopMulti(key, left_, count_, &elems);
+      if (s.ok() && !elems.empty()) {
+        chosen_key = key;
+        break;
+      }
+      if (!s.IsNotFound()) {
+        break;
+      }
+    }
+
+    if (s.ok()) {
+      if (!elems.empty()) {
+        conn_->GetServer()->UpdateWatchedKeysManually({chosen_key});
+        std::string elems_bulk = redis::MultiBulkString(elems);
+        conn_->Reply(redis::Array({redis::BulkString(chosen_key), 
std::move(elems_bulk)}));
+      }
+    } else if (!s.IsNotFound()) {
+      conn_->Reply(redis::Error("ERR " + s.ToString()));
+    }
+
+    return s;
+  }
+
+  void BlockKeys() override {
+    for (const auto &key : keys_) {
+      svr_->BlockOnKey(key, conn_);
+    }
+  }
+
+  void UnblockKeys() override {
+    for (const auto &key : keys_) {
+      svr_->UnblockOnKey(key, conn_);
+    }
+  }
+
+  bool OnBlockingWrite() override {
+    auto s = ExecuteUnblocked();
+    return !s.IsNotFound();
+  }
+
+  std::string NoopReply() override { return redis::NilString(); }
+
+  static const inline CommandKeyRangeGen keyRangeGen = [](const 
std::vector<std::string> &args) {
+    CommandKeyRange range;
+    range.first_key = 3;
+    range.key_step = 1;
+    // This parsing would always succeed as this cmd has been parsed before.
+    auto num_key = *ParseInt<int32_t>(args[2], 10);
+    range.last_key = range.first_key + num_key - 1;
+    return range;
+  };
+
+ private:
+  bool left_;
+  uint32_t count_ = -1;
+  int64_t timeout_ = 0;  // microseconds
+  std::vector<std::string> keys_;
+  Server *svr_ = nullptr;
+};
+
 class CommandLRem : public Commander {
  public:
   Status Parse(const std::vector<std::string> &args) override {
@@ -727,6 +850,7 @@ class CommandLPos : public Commander {
 
 REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandBLPop>("blpop", -3, "write 
no-script", 1, -2, 1),
                         MakeCmdAttr<CommandBRPop>("brpop", -3, "write 
no-script", 1, -2, 1),
+                        MakeCmdAttr<CommandBLMPop>("blmpop", -5, "write 
no-script", CommandBLMPop::keyRangeGen),
                         MakeCmdAttr<CommandLIndex>("lindex", 3, "read-only", 
1, 1, 1),
                         MakeCmdAttr<CommandLInsert>("linsert", 5, "write", 1, 
1, 1),
                         MakeCmdAttr<CommandLLen>("llen", 2, "read-only", 1, 1, 
1),
diff --git a/tests/gocase/unit/type/list/list_test.go 
b/tests/gocase/unit/type/list/list_test.go
index 0522e783..e7f744e0 100644
--- a/tests/gocase/unit/type/list/list_test.go
+++ b/tests/gocase/unit/type/list/list_test.go
@@ -1177,4 +1177,309 @@ func TestList(t *testing.T) {
                        require.EqualError(t, lmpopNoCount(rdb, ctx, direction, 
key1, key2).Err(), redis.Nil.Error())
                })
        }
+
+       for _, direction := range []string{"LEFT", "RIGHT"} {
+               key1 := "blmpop-list1"
+               key2 := "blmpop-list2"
+               rdb.Del(ctx, key1, key2)
+               require.EqualValues(t, 5, rdb.LPush(ctx, key1, "one", "two", 
"three", "four", "five").Val())
+               require.EqualValues(t, 5, rdb.LPush(ctx, key2, "ONE", "TWO", 
"THREE", "FOUR", "FIVE").Val())
+
+               zeroTimeout := time.Second * 0
+
+               // TEST SUIT #1: non-blocking scenario (at least one queried 
list is not empty).
+               // In these cases, the behavior should be the same to LMPOP.
+               t.Run(fmt.Sprintf("BLMPOP test unblocked oneKey countSingle 
%s", direction), func(t *testing.T) {
+                       result := rdb.BLMPop(ctx, zeroTimeout, direction, 1, 
key1)
+                       resultKey, resultVal := result.Val()
+                       require.NoError(t, result.Err())
+                       require.EqualValues(t, key1, resultKey)
+                       if direction == "LEFT" {
+                               require.Equal(t, []string{"five"}, resultVal)
+                       } else {
+                               require.Equal(t, []string{"one"}, resultVal)
+                       }
+               })
+
+               t.Run(fmt.Sprintf("BLMPOP test unblocked oneKey countMulti %s", 
direction), func(t *testing.T) {
+                       result := rdb.BLMPop(ctx, zeroTimeout, direction, 2, 
key1)
+                       resultKey, resultVal := result.Val()
+                       require.NoError(t, result.Err())
+                       require.EqualValues(t, key1, resultKey)
+                       if direction == "LEFT" {
+                               require.Equal(t, []string{"four", "three"}, 
resultVal)
+                       } else {
+                               require.Equal(t, []string{"two", "three"}, 
resultVal)
+                       }
+               })
+
+               t.Run(fmt.Sprintf("BLMPOP test unblocked oneKey countTooMuch 
%s", direction), func(t *testing.T) {
+                       result := rdb.BLMPop(ctx, zeroTimeout, direction, 10, 
key1)
+                       resultKey, resultVal := result.Val()
+                       require.NoError(t, result.Err())
+                       require.EqualValues(t, key1, resultKey)
+                       if direction == "LEFT" {
+                               require.Equal(t, []string{"two", "one"}, 
resultVal)
+                       } else {
+                               require.Equal(t, []string{"four", "five"}, 
resultVal)
+                       }
+               })
+
+               require.EqualValues(t, 2, rdb.LPush(ctx, key1, "six", 
"seven").Val())
+               t.Run(fmt.Sprintf("BLMPOP test unblocked firstKey countOver 
%s", direction), func(t *testing.T) {
+                       result := rdb.BLMPop(ctx, zeroTimeout, direction, 10, 
key1, key2)
+                       resultKey, resultVal := result.Val()
+                       require.NoError(t, result.Err())
+                       require.EqualValues(t, key1, resultKey)
+                       if direction == "LEFT" {
+                               require.Equal(t, []string{"seven", "six"}, 
resultVal)
+                       } else {
+                               require.Equal(t, []string{"six", "seven"}, 
resultVal)
+                       }
+               })
+
+               t.Run(fmt.Sprintf("BLMPOP test unblocked secondKey countSingle 
%s", direction), func(t *testing.T) {
+                       result := rdb.BLMPop(ctx, zeroTimeout, direction, 1, 
key1, key2)
+                       resultKey, resultVal := result.Val()
+                       require.NoError(t, result.Err())
+                       require.EqualValues(t, key2, resultKey)
+                       if direction == "LEFT" {
+                               require.Equal(t, []string{"FIVE"}, resultVal)
+                       } else {
+                               require.Equal(t, []string{"ONE"}, resultVal)
+                       }
+               })
+
+               t.Run(fmt.Sprintf("BLMPOP test unblocked secondKey countMulti 
%s", direction), func(t *testing.T) {
+                       result := rdb.BLMPop(ctx, zeroTimeout, direction, 2, 
key1, key2)
+                       resultKey, resultVal := result.Val()
+                       require.NoError(t, result.Err())
+                       require.EqualValues(t, key2, resultKey)
+                       if direction == "LEFT" {
+                               require.Equal(t, []string{"FOUR", "THREE"}, 
resultVal)
+                       } else {
+                               require.Equal(t, []string{"TWO", "THREE"}, 
resultVal)
+                       }
+               })
+
+               t.Run(fmt.Sprintf("BLMPOP test unblocked secondKey countOver 
%s", direction), func(t *testing.T) {
+                       result := rdb.BLMPop(ctx, zeroTimeout, direction, 10, 
key1, key2)
+                       resultKey, resultVal := result.Val()
+                       require.NoError(t, result.Err())
+                       require.EqualValues(t, key2, resultKey)
+                       if direction == "LEFT" {
+                               require.Equal(t, []string{"TWO", "ONE"}, 
resultVal)
+                       } else {
+                               require.Equal(t, []string{"FOUR", "FIVE"}, 
resultVal)
+                       }
+               })
+
+               blmpopNoCount := func(c *redis.Client, ctx context.Context, 
timeout string, direction string, keys ...string) *redis.KeyValuesCmd {
+                       args := make([]interface{}, 3+len(keys), 6+len(keys))
+                       args[0] = "blmpop"
+                       args[1] = timeout
+                       args[2] = len(keys)
+                       for i, key := range keys {
+                               args[3+i] = key
+                       }
+                       args = append(args, strings.ToLower(direction))
+                       cmd := redis.NewKeyValuesCmd(ctx, args...)
+                       _ = c.Process(ctx, cmd)
+                       return cmd
+               }
+               rdb.Del(ctx, key1, key2)
+               require.EqualValues(t, 2, rdb.LPush(ctx, key1, "one", 
"two").Val())
+               require.EqualValues(t, 2, rdb.LPush(ctx, key2, "ONE", 
"TWO").Val())
+
+               t.Run(fmt.Sprintf("BLMPOP test unblocked oneKey noCount one 
%s", direction), func(t *testing.T) {
+                       result := blmpopNoCount(rdb, ctx, "0", direction, key1)
+                       resultKey, resultVal := result.Val()
+                       require.NoError(t, result.Err())
+                       require.EqualValues(t, key1, resultKey)
+                       if direction == "LEFT" {
+                               require.Equal(t, []string{"two"}, resultVal)
+                       } else {
+                               require.Equal(t, []string{"one"}, resultVal)
+                       }
+               })
+
+               t.Run(fmt.Sprintf("BLMPOP test unblocked firstKey noCount one 
%s", direction), func(t *testing.T) {
+                       result := blmpopNoCount(rdb, ctx, "0", direction, key1, 
key2)
+                       resultKey, resultVal := result.Val()
+                       require.NoError(t, result.Err())
+                       require.EqualValues(t, key1, resultKey)
+                       if direction == "LEFT" {
+                               require.Equal(t, []string{"one"}, resultVal)
+                       } else {
+                               require.Equal(t, []string{"two"}, resultVal)
+                       }
+               })
+
+               t.Run(fmt.Sprintf("BLMPOP test unblocked secondKey noCount one 
%s", direction), func(t *testing.T) {
+                       result := blmpopNoCount(rdb, ctx, "0", direction, key1, 
key2)
+                       resultKey, resultVal := result.Val()
+                       require.NoError(t, result.Err())
+                       require.EqualValues(t, key2, resultKey)
+                       if direction == "LEFT" {
+                               require.Equal(t, []string{"TWO"}, resultVal)
+                       } else {
+                               require.Equal(t, []string{"ONE"}, resultVal)
+                       }
+               })
+
+               t.Run(fmt.Sprintf("BLMPOP test unblocked secondKey noCount one 
%s", direction), func(t *testing.T) {
+                       result := blmpopNoCount(rdb, ctx, "0", direction, key1, 
key2)
+                       resultKey, resultVal := result.Val()
+                       require.NoError(t, result.Err())
+                       require.EqualValues(t, key2, resultKey)
+                       if direction == "LEFT" {
+                               require.Equal(t, []string{"ONE"}, resultVal)
+                       } else {
+                               require.Equal(t, []string{"TWO"}, resultVal)
+                       }
+               })
+
+               // TEST SUIT #2: blocking scenario, but data reaches within 
timeout.
+               t.Run(fmt.Sprintf("BLMPOP test blocked served oneKey 
countSingle %s", direction), func(t *testing.T) {
+                       rd := srv.NewTCPClient()
+                       defer func() { require.NoError(t, rd.Close()) }()
+                       require.NoError(t, rdb.Del(ctx, key1, key2).Err())
+                       require.NoError(t, rd.WriteArgs("blmpop", "1", "1", 
key1, direction, "count", "1"))
+                       time.Sleep(time.Millisecond * 100)
+                       require.NoError(t, rdb.RPush(ctx, key1, "ONE", 
"TWO").Err())
+                       time.Sleep(time.Millisecond * 100)
+                       if direction == "LEFT" {
+                               rd.MustReadStringsWithKey(t, key1, 
[]string{"ONE"})
+                       } else {
+                               rd.MustReadStringsWithKey(t, key1, 
[]string{"TWO"})
+                       }
+                       require.EqualValues(t, 1, rdb.Exists(ctx, key1).Val())
+               })
+
+               t.Run(fmt.Sprintf("BLMPOP test blocked served oneKey countMulti 
%s", direction), func(t *testing.T) {
+                       rd := srv.NewTCPClient()
+                       defer func() { require.NoError(t, rd.Close()) }()
+                       require.NoError(t, rdb.Del(ctx, key1, key2).Err())
+                       require.NoError(t, rd.WriteArgs("blmpop", "1", "1", 
key1, direction, "count", "2"))
+                       time.Sleep(time.Millisecond * 100)
+                       require.NoError(t, rdb.RPush(ctx, key1, "ONE", 
"TWO").Err())
+                       time.Sleep(time.Millisecond * 100)
+                       if direction == "LEFT" {
+                               rd.MustReadStringsWithKey(t, key1, 
[]string{"ONE", "TWO"})
+                       } else {
+                               rd.MustReadStringsWithKey(t, key1, 
[]string{"TWO", "ONE"})
+                       }
+                       require.EqualValues(t, 0, rdb.Exists(ctx, key1).Val())
+               })
+
+               t.Run(fmt.Sprintf("BLMPOP test blocked served oneKey countOver 
%s", direction), func(t *testing.T) {
+                       rd := srv.NewTCPClient()
+                       defer func() { require.NoError(t, rd.Close()) }()
+                       require.NoError(t, rdb.Del(ctx, key1, key2).Err())
+                       require.NoError(t, rd.WriteArgs("blmpop", "1", "1", 
key1, direction, "count", "10"))
+                       time.Sleep(time.Millisecond * 100)
+                       require.NoError(t, rdb.RPush(ctx, key1, "ONE", 
"TWO").Err())
+                       time.Sleep(time.Millisecond * 100)
+                       if direction == "LEFT" {
+                               rd.MustReadStringsWithKey(t, key1, 
[]string{"ONE", "TWO"})
+                       } else {
+                               rd.MustReadStringsWithKey(t, key1, 
[]string{"TWO", "ONE"})
+                       }
+                       require.EqualValues(t, 0, rdb.Exists(ctx, key1).Val())
+               })
+
+               t.Run(fmt.Sprintf("BLMPOP test blocked served firstKey 
countOver %s", direction), func(t *testing.T) {
+                       rd := srv.NewTCPClient()
+                       defer func() { require.NoError(t, rd.Close()) }()
+                       require.NoError(t, rdb.Del(ctx, key1, key2).Err())
+                       require.NoError(t, rd.WriteArgs("blmpop", "1", "2", 
key1, key2, direction, "count", "2"))
+                       time.Sleep(time.Millisecond * 100)
+                       require.NoError(t, rdb.RPush(ctx, key1, "ONE", 
"TWO").Err())
+                       time.Sleep(time.Millisecond * 100)
+                       if direction == "LEFT" {
+                               rd.MustReadStringsWithKey(t, key1, 
[]string{"ONE", "TWO"})
+                       } else {
+                               rd.MustReadStringsWithKey(t, key1, 
[]string{"TWO", "ONE"})
+                       }
+                       require.EqualValues(t, 0, rdb.Exists(ctx, key1).Val())
+               })
+
+               t.Run(fmt.Sprintf("BLMPOP test blocked served secondKey 
countOver %s", direction), func(t *testing.T) {
+                       rd := srv.NewTCPClient()
+                       defer func() { require.NoError(t, rd.Close()) }()
+                       require.NoError(t, rdb.Del(ctx, key1, key2).Err())
+                       require.NoError(t, rd.WriteArgs("blmpop", "1", "2", 
key1, key2, direction, "count", "2"))
+                       time.Sleep(time.Millisecond * 100)
+                       require.NoError(t, rdb.RPush(ctx, key2, "one", 
"two").Err())
+                       time.Sleep(time.Millisecond * 100)
+                       if direction == "LEFT" {
+                               rd.MustReadStringsWithKey(t, key2, 
[]string{"one", "two"})
+                       } else {
+                               rd.MustReadStringsWithKey(t, key2, 
[]string{"two", "one"})
+                       }
+                       require.EqualValues(t, 0, rdb.Exists(ctx, key2).Val())
+               })
+
+               t.Run(fmt.Sprintf("BLMPOP test blocked served bothKey FIFO 
countOver %s", direction), func(t *testing.T) {
+                       rd := srv.NewTCPClient()
+                       defer func() { require.NoError(t, rd.Close()) }()
+                       require.NoError(t, rdb.Del(ctx, key1, key2).Err())
+                       require.NoError(t, rd.WriteArgs("blmpop", "1", "2", 
key1, key2, direction, "count", "2"))
+                       time.Sleep(time.Millisecond * 100)
+                       require.NoError(t, rdb.RPush(ctx, key2, "one", 
"two").Err())
+                       time.Sleep(time.Millisecond * 100)
+                       require.NoError(t, rdb.RPush(ctx, key1, "ONE", 
"TWO").Err())
+                       time.Sleep(time.Millisecond * 100)
+                       if direction == "LEFT" {
+                               rd.MustReadStringsWithKey(t, key2, 
[]string{"one", "two"})
+                       } else {
+                               rd.MustReadStringsWithKey(t, key2, 
[]string{"two", "one"})
+                       }
+                       require.EqualValues(t, 0, rdb.Exists(ctx, key2).Val())
+                       require.EqualValues(t, 2, rdb.LLen(ctx, key1).Val())
+               })
+
+               t.Run(fmt.Sprintf("BLMPOP test blocked served secondKey noCount 
%s", direction), func(t *testing.T) {
+                       rd := srv.NewTCPClient()
+                       defer func() { require.NoError(t, rd.Close()) }()
+                       require.NoError(t, rdb.Del(ctx, key1, key2).Err())
+                       require.NoError(t, rd.WriteArgs("blmpop", "1", "2", 
key1, key2, direction))
+                       time.Sleep(time.Millisecond * 100)
+                       require.NoError(t, rdb.RPush(ctx, key2, "one", 
"two").Err())
+                       time.Sleep(time.Millisecond * 100)
+                       if direction == "LEFT" {
+                               rd.MustReadStringsWithKey(t, key2, 
[]string{"one"})
+                       } else {
+                               rd.MustReadStringsWithKey(t, key2, 
[]string{"two"})
+                       }
+                       require.EqualValues(t, 1, rdb.LLen(ctx, key2).Val())
+               })
+
+               // TEST SUIT #3: blocking scenario, and timeout is triggered.
+               t.Run(fmt.Sprintf("BLMPOP test blocked timeout %s", direction), 
func(t *testing.T) {
+                       rd := srv.NewTCPClient()
+                       defer func() { require.NoError(t, rd.Close()) }()
+                       require.NoError(t, rdb.Del(ctx, key1, key2).Err())
+                       require.NoError(t, rd.WriteArgs("blmpop", "1", "2", 
key1, key2, direction))
+                       time.Sleep(time.Millisecond * 1200)
+                       rd.MustMatch(t, "")
+               })
+
+               // TEST SUIT #4: blocking scenario, and timeout is 0 
(permanently blocked).
+               t.Run(fmt.Sprintf("BLMPOP test blocked infinitely served 
secondKey countOver %s", direction), func(t *testing.T) {
+                       rd := srv.NewTCPClient()
+                       defer func() { require.NoError(t, rd.Close()) }()
+                       require.NoError(t, rdb.Del(ctx, key1, key2).Err())
+                       require.NoError(t, rd.WriteArgs("blmpop", "0", "2", 
key1, key2, direction, "count", "2"))
+                       time.Sleep(time.Millisecond * 1200)
+                       require.NoError(t, rdb.RPush(ctx, key2, "one", 
"two").Err())
+                       time.Sleep(time.Millisecond * 100)
+                       if direction == "LEFT" {
+                               rd.MustReadStringsWithKey(t, key2, 
[]string{"one", "two"})
+                       } else {
+                               rd.MustReadStringsWithKey(t, key2, 
[]string{"two", "one"})
+                       }
+                       require.EqualValues(t, 0, rdb.Exists(ctx, key2).Val())
+               })
+       }
 }
diff --git a/tests/gocase/util/tcp_client.go b/tests/gocase/util/tcp_client.go
index 1a65bcb1..46ed3ac9 100644
--- a/tests/gocase/util/tcp_client.go
+++ b/tests/gocase/util/tcp_client.go
@@ -87,6 +87,21 @@ func (c *TCPClient) MustReadStrings(t testing.TB, s 
[]string) {
        }
 }
 
+func (c *TCPClient) MustReadStringsWithKey(t testing.TB, key string, s 
[]string) {
+       r, err := c.ReadLine()
+       require.NoError(t, err)
+       require.EqualValues(t, '*', r[0])
+       n, err := strconv.Atoi(r[1:])
+       require.NoError(t, err)
+       require.Equal(t, n, 2)
+
+       _, err = c.ReadLine()
+       require.NoError(t, err)
+       c.MustRead(t, key)
+
+       c.MustReadStrings(t, s)
+}
+
 func (c *TCPClient) MustMatch(t testing.TB, rx string) {
        r, err := c.ReadLine()
        require.NoError(t, err)

Reply via email to