This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 695568f4 Add the support of the LPOS command (#1681)
695568f4 is described below

commit 695568f4ef14dd3495ea176bfe76f29ef500dd96
Author: Jover <[email protected]>
AuthorDate: Sat Aug 19 20:42:37 2023 +0800

    Add the support of the LPOS command (#1681)
---
 src/commands/cmd_list.cc                 | 69 +++++++++++++++++++++++++
 src/types/redis_list.cc                  | 57 ++++++++++++++++++++
 src/types/redis_list.h                   |  9 ++++
 tests/cppunit/types/list_test.cc         | 89 ++++++++++++++++++++++++++++++++
 tests/gocase/unit/type/list/list_test.go | 54 +++++++++++++++++++
 5 files changed, 278 insertions(+)

diff --git a/src/commands/cmd_list.cc b/src/commands/cmd_list.cc
index 9f178765..60621fdf 100644
--- a/src/commands/cmd_list.cc
+++ b/src/commands/cmd_list.cc
@@ -19,6 +19,7 @@
  */
 
 #include "commander.h"
+#include "commands/command_parser.h"
 #include "error_constants.h"
 #include "event_util.h"
 #include "server/server.h"
@@ -680,6 +681,73 @@ class CommandBLMove : public Commander,
   void unblockOnSrc() { svr_->UnblockOnKey(args_[1], conn_); }
 };
 
+class CommandLPos : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    if (args.size() > 9) {
+      return {Status::RedisParseErr, errInvalidSyntax};
+    }
+
+    CommandParser parser(args, 3);
+    while (parser.Good()) {
+      if (parser.EatEqICase("rank")) {
+        spec_.rank = GET_OR_RET(parser.TakeInt());
+        if (spec_.rank == 0) {
+          return {Status::RedisParseErr,
+                  "RANK can't be zero: use 1 to start from "
+                  "the first match, 2 from the second ... "
+                  "or use negative to start from the end of the list"};
+        }
+      } else if (parser.EatEqICase("count")) {
+        spec_.count = GET_OR_RET(parser.TakeInt());
+        if (spec_.count < 0) {
+          return {Status::RedisExecErr, "COUNT can't be negative"};
+        }
+      } else if (parser.EatEqICase("maxlen")) {
+        spec_.max_len = GET_OR_RET(parser.TakeInt());
+        if (spec_.max_len < 0) {
+          return {Status::RedisExecErr, "MAXLEN can't be negative"};
+        }
+      } else {
+        return {Status::RedisParseErr, errInvalidSyntax};
+      }
+    }
+    return Status::OK();
+  }
+
+  Status Execute(Server *svr, Connection *conn, std::string *output) override {
+    redis::List list_db(svr->storage, conn->GetNamespace());
+    std::vector<int64_t> indexes;
+    auto s = list_db.Pos(args_[1], args_[2], spec_, &indexes);
+    if (!s.ok() && !s.IsNotFound()) {
+      return {Status::RedisExecErr, s.ToString()};
+    }
+
+    // We return nil or a single value if `COUNT` option is not given.
+    if (!spec_.count.has_value()) {
+      if (s.IsNotFound() || indexes.empty()) {
+        *output = redis::NilString();
+      } else {
+        assert(indexes.size() == 1);
+        *output = redis::Integer(indexes[0]);
+      }
+    }
+    // Otherwise we return an array.
+    else {
+      std::vector<std::string> values;
+      values.reserve(indexes.size());
+      for (const auto &index : indexes) {
+        values.emplace_back(std::to_string(index));
+      }
+      *output = redis::MultiBulkString(values, false);
+    }
+    return Status::OK();
+  }
+
+ private:
+  PosSpec spec_;
+};
+
 REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandBLPop>("blpop", -3, "write 
no-script", 1, -2, 1),
                         MakeCmdAttr<CommandBRPop>("brpop", -3, "write 
no-script", 1, -2, 1),
                         MakeCmdAttr<CommandLIndex>("lindex", 3, "read-only", 
1, 1, 1),
@@ -688,6 +756,7 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandBLPop>("blpop", 
-3, "write no-script"
                         MakeCmdAttr<CommandLMove>("lmove", 5, "write", 1, 2, 
1),
                         MakeCmdAttr<CommandBLMove>("blmove", 6, "write", 1, 2, 
1),
                         MakeCmdAttr<CommandLPop>("lpop", -2, "write", 1, 1, 
1),  //
+                        MakeCmdAttr<CommandLPos>("lpos", -3, "read-only", 1, 
1, 1),
                         MakeCmdAttr<CommandLPush>("lpush", -3, "write", 1, 1, 
1),
                         MakeCmdAttr<CommandLPushX>("lpushx", -3, "write", 1, 
1, 1),
                         MakeCmdAttr<CommandLRange>("lrange", 4, "read-only", 
1, 1, 1),
diff --git a/src/types/redis_list.cc b/src/types/redis_list.cc
index 34db5be0..b862a330 100644
--- a/src/types/redis_list.cc
+++ b/src/types/redis_list.cc
@@ -392,6 +392,63 @@ rocksdb::Status List::Range(const Slice &user_key, int 
start, int stop, std::vec
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status List::Pos(const Slice &user_key, const Slice &elem, const 
PosSpec &spec,
+                          std::vector<int64_t> *indexes) {
+  indexes->clear();
+
+  std::string ns_key = AppendNamespacePrefix(user_key);
+  ListMetadata metadata(false);
+  rocksdb::Status s = GetMetadata(ns_key, &metadata);
+  if (!s.ok()) return s;
+
+  // A negative rank means start from the tail.
+  int64_t rank = spec.rank;
+  uint64_t start = metadata.head;
+  bool reversed = false;
+  if (rank < 0) {
+    rank = -rank;
+    start = metadata.tail - 1;
+    reversed = true;
+  }
+
+  std::string buf;
+  PutFixed64(&buf, start);
+  std::string start_key = InternalKey(ns_key, buf, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+  std::string prefix = InternalKey(ns_key, "", metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+  std::string next_version_prefix = InternalKey(ns_key, "", metadata.version + 
1, storage_->IsSlotIdEncoded()).Encode();
+
+  rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
+  LatestSnapShot ss(storage_);
+  read_options.snapshot = ss.GetSnapShot();
+  rocksdb::Slice upper_bound(next_version_prefix);
+  read_options.iterate_upper_bound = &upper_bound;
+  rocksdb::Slice lower_bound(prefix);
+  read_options.iterate_lower_bound = &lower_bound;
+
+  auto list_len = static_cast<int64_t>(metadata.size);
+  int64_t max_len = spec.max_len;
+  int64_t count = spec.count.value_or(-1);
+  int64_t offset = 0, matches = 0;
+
+  auto iter = util::UniqueIterator(storage_, read_options);
+  iter->Seek(start_key);
+  while (iter->Valid() && iter->key().starts_with(prefix) && (max_len == 0 || 
offset < max_len)) {
+    if (iter->value() == elem) {
+      matches++;
+      if (matches >= rank) {
+        int64_t pos = !reversed ? offset : list_len - offset - 1;
+        indexes->push_back(pos);
+        if (count != 0 && matches - rank + 1 >= count) {
+          break;
+        }
+      }
+    }
+    offset++;
+    !reversed ? iter->Next() : iter->Prev();
+  }
+  return rocksdb::Status::OK();
+}
+
 rocksdb::Status List::Set(const Slice &user_key, int index, Slice elem) {
   std::string ns_key = AppendNamespacePrefix(user_key);
 
diff --git a/src/types/redis_list.h b/src/types/redis_list.h
index 2cea147d..cf0effd8 100644
--- a/src/types/redis_list.h
+++ b/src/types/redis_list.h
@@ -22,6 +22,7 @@
 
 #include <stdint.h>
 
+#include <optional>
 #include <string>
 #include <vector>
 
@@ -29,6 +30,13 @@
 #include "storage/redis_db.h"
 #include "storage/redis_metadata.h"
 
+struct PosSpec {
+  int64_t rank = 1;
+  std::optional<int64_t> count = std::nullopt;
+  int64_t max_len = 0;
+  explicit PosSpec() = default;
+};
+
 namespace redis {
 class List : public Database {
  public:
@@ -45,6 +53,7 @@ class List : public Database {
   rocksdb::Status Push(const Slice &user_key, const std::vector<Slice> &elems, 
bool left, uint64_t *new_size);
   rocksdb::Status PushX(const Slice &user_key, const std::vector<Slice> 
&elems, bool left, uint64_t *new_size);
   rocksdb::Status Range(const Slice &user_key, int start, int stop, 
std::vector<std::string> *elems);
+  rocksdb::Status Pos(const Slice &user_key, const Slice &elem, const PosSpec 
&spec, std::vector<int64_t> *indexes);
 
  private:
   rocksdb::Status GetMetadata(const Slice &ns_key, ListMetadata *metadata);
diff --git a/tests/cppunit/types/list_test.cc b/tests/cppunit/types/list_test.cc
index c723621c..fcb23aa5 100644
--- a/tests/cppunit/types/list_test.cc
+++ b/tests/cppunit/types/list_test.cc
@@ -31,14 +31,20 @@ class RedisListTest : public TestBase {
   ~RedisListTest() override = default;
 
   void SetUp() override {
+    // Assume that `field_` is a matrix of size `m_` * `n_`,
+    // where every row is identical, and each element within a single row is 
distinct.
     key_ = "test-list-key";
     fields_ = {"list-test-key-1", "list-test-key-2", "list-test-key-3", 
"list-test-key-4", "list-test-key-5",
                "list-test-key-1", "list-test-key-2", "list-test-key-3", 
"list-test-key-4", "list-test-key-5",
                "list-test-key-1", "list-test-key-2", "list-test-key-3", 
"list-test-key-4", "list-test-key-5",
                "list-test-key-1", "list-test-key-2", "list-test-key-3", 
"list-test-key-4", "list-test-key-5"};
+    m_ = 4;
+    n_ = 5;
   }
 
   std::unique_ptr<redis::List> list_;
+  int n_;
+  int m_;
 };
 
 class RedisListSpecificTest : public RedisListTest {
@@ -162,6 +168,89 @@ TEST_F(RedisListTest, Range) {
   list_->Del(key_);
 }
 
+TEST_F(RedisListTest, Pos) {
+  uint64_t ret = 0;
+  list_->Push(key_, fields_, false, &ret);
+  EXPECT_EQ(fields_.size(), ret);
+
+  // Basic usage
+  PosSpec spec;
+  std::vector<int64_t> indexes;
+  list_->Pos(key_, fields_[0], spec, &indexes);
+  EXPECT_EQ(1, indexes.size());
+  EXPECT_EQ(0, indexes[0]);
+  list_->Pos(key_, fields_[2], spec, &indexes);
+  EXPECT_EQ(1, indexes.size());
+  EXPECT_EQ(2, indexes[0]);
+
+  // RANK option
+  spec = PosSpec{};
+  spec.rank = m_ + 1;
+  auto s = list_->Pos(key_, fields_[3], spec, &indexes);
+  EXPECT_TRUE(s.ok());
+  EXPECT_TRUE(indexes.empty());
+  spec.rank = -(m_ + 1);
+  s = list_->Pos(key_, fields_[3], spec, &indexes);
+  EXPECT_TRUE(s.ok());
+  EXPECT_TRUE(indexes.empty());
+  // positive
+  for (int i = 1; i <= m_; ++i) {
+    spec.rank = i;
+    list_->Pos(key_, fields_[3], spec, &indexes);
+    EXPECT_EQ(1, indexes.size());
+    EXPECT_EQ(n_ * (i - 1) + 3, indexes[0]);
+  }
+  // negative
+  for (int i = 1; i <= m_; ++i) {
+    spec.rank = -i;
+    list_->Pos(key_, fields_[3], spec, &indexes);
+    EXPECT_EQ(1, indexes.size());
+    EXPECT_EQ(fields_.size() - n_ * i + 3, indexes[0]);
+  }
+
+  // COUNT option
+  spec = PosSpec{};
+  spec.count = 0;
+  list_->Pos(key_, fields_[3], spec, &indexes);
+  EXPECT_EQ(m_, indexes.size());
+  EXPECT_EQ(3, indexes[0]);
+  spec.count = 2;
+  list_->Pos(key_, fields_[3], spec, &indexes);
+  EXPECT_EQ(2, indexes.size());
+  EXPECT_EQ(3, indexes[0]);
+  spec.count = 100;
+  list_->Pos(key_, fields_[3], spec, &indexes);
+  EXPECT_EQ(m_, indexes.size());
+  EXPECT_EQ(3, indexes[0]);
+
+  // COUNT + RANK option
+  spec = PosSpec{};
+  spec.count = 0;
+  spec.rank = 2;
+  list_->Pos(key_, fields_[3], spec, &indexes);
+  EXPECT_EQ(n_ - 2, indexes.size());
+  spec.count = 2;
+  spec.rank = -2;
+  list_->Pos(key_, fields_[3], spec, &indexes);
+  EXPECT_EQ(2, indexes.size());
+
+  // MAXLEN option
+  PosSpec maxlen_spec;
+  maxlen_spec.max_len = 2;
+  list_->Pos(key_, fields_[2], maxlen_spec, &indexes);
+  EXPECT_TRUE(indexes.empty());
+  list_->Pos(key_, fields_[1], maxlen_spec, &indexes);
+  EXPECT_EQ(1, indexes.size());
+  EXPECT_EQ(1, indexes[0]);
+  maxlen_spec.count = 0;
+  maxlen_spec.max_len = (n_ * 2);
+  list_->Pos(key_, fields_[3], maxlen_spec, &indexes);
+  EXPECT_EQ(2, indexes.size());
+  EXPECT_EQ(3, indexes[0]);
+
+  list_->Del(key_);
+}
+
 TEST_F(RedisListTest, Rem) {
   uint64_t ret = 0;
   uint64_t len = 0;
diff --git a/tests/gocase/unit/type/list/list_test.go 
b/tests/gocase/unit/type/list/list_test.go
index 05199997..2cf2e317 100644
--- a/tests/gocase/unit/type/list/list_test.go
+++ b/tests/gocase/unit/type/list/list_test.go
@@ -941,4 +941,58 @@ func TestList(t *testing.T) {
                rd.MustRead(t, "$3")
                require.Equal(t, "bar", rdb.LRange(ctx, "target", 0, 
-1).Val()[0])
        })
+
+       for listType, large := range largeValue {
+               t.Run(fmt.Sprintf("LPOS basic usage - %s", listType), func(t 
*testing.T) {
+                       createList("mylist", []string{"a", "b", "c", large, 
"2", "3", "c", "c"})
+                       require.Equal(t, int64(0), rdb.LPos(ctx, "mylist", "a", 
redis.LPosArgs{}).Val())
+                       require.Equal(t, int64(2), rdb.LPos(ctx, "mylist", "c", 
redis.LPosArgs{}).Val())
+               })
+
+               t.Run("LPOS RANK option", func(t *testing.T) {
+                       require.Equal(t, int64(2), rdb.LPos(ctx, "mylist", "c", 
redis.LPosArgs{Rank: 1}).Val())
+                       require.Equal(t, int64(6), rdb.LPos(ctx, "mylist", "c", 
redis.LPosArgs{Rank: 2}).Val())
+                       require.Error(t, rdb.LPos(ctx, "mylist", "c", 
redis.LPosArgs{Rank: 4}).Err())
+                       require.Equal(t, int64(7), rdb.LPos(ctx, "mylist", "c", 
redis.LPosArgs{Rank: -1}).Val())
+                       require.Equal(t, int64(6), rdb.LPos(ctx, "mylist", "c", 
redis.LPosArgs{Rank: -2}).Val())
+                       err := rdb.Do(ctx, "LPOS", "mylist", "c", "RANK", 
0).Err()
+                       require.Error(t, err)
+                       require.True(t, strings.Contains(err.Error(), "RANK 
can't be zero"))
+               })
+
+               t.Run("LPOS COUNT option", func(t *testing.T) {
+                       require.Equal(t, []int64{2, 6, 7}, rdb.LPosCount(ctx, 
"mylist", "c", 0, redis.LPosArgs{}).Val())
+                       require.Equal(t, []int64{2}, rdb.LPosCount(ctx, 
"mylist", "c", 1, redis.LPosArgs{}).Val())
+                       require.Equal(t, []int64{2, 6}, rdb.LPosCount(ctx, 
"mylist", "c", 2, redis.LPosArgs{}).Val())
+                       require.Equal(t, []int64{2, 6, 7}, rdb.LPosCount(ctx, 
"mylist", "c", 100, redis.LPosArgs{}).Val())
+               })
+
+               t.Run("LPOS COUNT + RANK option", func(t *testing.T) {
+                       require.Equal(t, []int64{6, 7}, rdb.LPosCount(ctx, 
"mylist", "c", 0, redis.LPosArgs{Rank: 2}).Val())
+                       require.Equal(t, []int64{7, 6}, rdb.LPosCount(ctx, 
"mylist", "c", 2, redis.LPosArgs{Rank: -1}).Val())
+               })
+
+               t.Run("LPOS non existing key", func(t *testing.T) {
+                       require.Empty(t, rdb.LPosCount(ctx, "mylistxxx", "c", 
0, redis.LPosArgs{Rank: 2}).Val())
+               })
+
+               t.Run("LPOS no match", func(t *testing.T) {
+                       require.Empty(t, rdb.LPosCount(ctx, "mylist", "x", 2, 
redis.LPosArgs{Rank: -1}).Val())
+                       require.Empty(t, rdb.LPos(ctx, "mylist", "x", 
redis.LPosArgs{Rank: -1}).Val())
+               })
+
+               t.Run("LPOS MAXLEN", func(t *testing.T) {
+                       require.Equal(t, []int64{0}, rdb.LPosCount(ctx, 
"mylist", "a", 0, redis.LPosArgs{MaxLen: 1}).Val())
+                       require.Empty(t, rdb.LPosCount(ctx, "mylist", "c", 0, 
redis.LPosArgs{MaxLen: 1}).Val())
+                       require.Equal(t, []int64{2}, rdb.LPosCount(ctx, 
"mylist", "c", 0, redis.LPosArgs{MaxLen: 3}).Val())
+                       require.Equal(t, []int64{7, 6}, rdb.LPosCount(ctx, 
"mylist", "c", 0, redis.LPosArgs{MaxLen: 3, Rank: -1}).Val())
+                       require.Equal(t, []int64{6}, rdb.LPosCount(ctx, 
"mylist", "c", 0, redis.LPosArgs{MaxLen: 7, Rank: 2}).Val())
+               })
+
+               t.Run("LPOS when RANK is greater than matches", func(t 
*testing.T) {
+                       require.NoError(t, rdb.Del(ctx, "mylist").Err())
+                       require.NoError(t, rdb.LPush(ctx, "mylist", "a").Err())
+                       require.Empty(t, rdb.LPosCount(ctx, "mylist", "b", 10, 
redis.LPosArgs{Rank: 5}).Val())
+               })
+       }
 }

Reply via email to