This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 695568f4 Add the support of the LPOS command (#1681)
695568f4 is described below
commit 695568f4ef14dd3495ea176bfe76f29ef500dd96
Author: Jover <[email protected]>
AuthorDate: Sat Aug 19 20:42:37 2023 +0800
Add the support of the LPOS command (#1681)
---
src/commands/cmd_list.cc | 69 +++++++++++++++++++++++++
src/types/redis_list.cc | 57 ++++++++++++++++++++
src/types/redis_list.h | 9 ++++
tests/cppunit/types/list_test.cc | 89 ++++++++++++++++++++++++++++++++
tests/gocase/unit/type/list/list_test.go | 54 +++++++++++++++++++
5 files changed, 278 insertions(+)
diff --git a/src/commands/cmd_list.cc b/src/commands/cmd_list.cc
index 9f178765..60621fdf 100644
--- a/src/commands/cmd_list.cc
+++ b/src/commands/cmd_list.cc
@@ -19,6 +19,7 @@
*/
#include "commander.h"
+#include "commands/command_parser.h"
#include "error_constants.h"
#include "event_util.h"
#include "server/server.h"
@@ -680,6 +681,73 @@ class CommandBLMove : public Commander,
void unblockOnSrc() { svr_->UnblockOnKey(args_[1], conn_); }
};
+class CommandLPos : public Commander {
+ public:
+ Status Parse(const std::vector<std::string> &args) override {
+ if (args.size() > 9) {
+ return {Status::RedisParseErr, errInvalidSyntax};
+ }
+
+ CommandParser parser(args, 3);
+ while (parser.Good()) {
+ if (parser.EatEqICase("rank")) {
+ spec_.rank = GET_OR_RET(parser.TakeInt());
+ if (spec_.rank == 0) {
+ return {Status::RedisParseErr,
+ "RANK can't be zero: use 1 to start from "
+ "the first match, 2 from the second ... "
+ "or use negative to start from the end of the list"};
+ }
+ } else if (parser.EatEqICase("count")) {
+ spec_.count = GET_OR_RET(parser.TakeInt());
+ if (spec_.count < 0) {
+ return {Status::RedisExecErr, "COUNT can't be negative"};
+ }
+ } else if (parser.EatEqICase("maxlen")) {
+ spec_.max_len = GET_OR_RET(parser.TakeInt());
+ if (spec_.max_len < 0) {
+ return {Status::RedisExecErr, "MAXLEN can't be negative"};
+ }
+ } else {
+ return {Status::RedisParseErr, errInvalidSyntax};
+ }
+ }
+ return Status::OK();
+ }
+
+ Status Execute(Server *svr, Connection *conn, std::string *output) override {
+ redis::List list_db(svr->storage, conn->GetNamespace());
+ std::vector<int64_t> indexes;
+ auto s = list_db.Pos(args_[1], args_[2], spec_, &indexes);
+ if (!s.ok() && !s.IsNotFound()) {
+ return {Status::RedisExecErr, s.ToString()};
+ }
+
+ // We return nil or a single value if `COUNT` option is not given.
+ if (!spec_.count.has_value()) {
+ if (s.IsNotFound() || indexes.empty()) {
+ *output = redis::NilString();
+ } else {
+ assert(indexes.size() == 1);
+ *output = redis::Integer(indexes[0]);
+ }
+ }
+ // Otherwise we return an array.
+ else {
+ std::vector<std::string> values;
+ values.reserve(indexes.size());
+ for (const auto &index : indexes) {
+ values.emplace_back(std::to_string(index));
+ }
+ *output = redis::MultiBulkString(values, false);
+ }
+ return Status::OK();
+ }
+
+ private:
+ PosSpec spec_;
+};
+
REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandBLPop>("blpop", -3, "write
no-script", 1, -2, 1),
MakeCmdAttr<CommandBRPop>("brpop", -3, "write
no-script", 1, -2, 1),
MakeCmdAttr<CommandLIndex>("lindex", 3, "read-only",
1, 1, 1),
@@ -688,6 +756,7 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandBLPop>("blpop",
-3, "write no-script"
MakeCmdAttr<CommandLMove>("lmove", 5, "write", 1, 2,
1),
MakeCmdAttr<CommandBLMove>("blmove", 6, "write", 1, 2,
1),
MakeCmdAttr<CommandLPop>("lpop", -2, "write", 1, 1,
1), //
+ MakeCmdAttr<CommandLPos>("lpos", -3, "read-only", 1,
1, 1),
MakeCmdAttr<CommandLPush>("lpush", -3, "write", 1, 1,
1),
MakeCmdAttr<CommandLPushX>("lpushx", -3, "write", 1,
1, 1),
MakeCmdAttr<CommandLRange>("lrange", 4, "read-only",
1, 1, 1),
diff --git a/src/types/redis_list.cc b/src/types/redis_list.cc
index 34db5be0..b862a330 100644
--- a/src/types/redis_list.cc
+++ b/src/types/redis_list.cc
@@ -392,6 +392,63 @@ rocksdb::Status List::Range(const Slice &user_key, int
start, int stop, std::vec
return rocksdb::Status::OK();
}
+rocksdb::Status List::Pos(const Slice &user_key, const Slice &elem, const
PosSpec &spec,
+ std::vector<int64_t> *indexes) {
+ indexes->clear();
+
+ std::string ns_key = AppendNamespacePrefix(user_key);
+ ListMetadata metadata(false);
+ rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ if (!s.ok()) return s;
+
+ // A negative rank means start from the tail.
+ int64_t rank = spec.rank;
+ uint64_t start = metadata.head;
+ bool reversed = false;
+ if (rank < 0) {
+ rank = -rank;
+ start = metadata.tail - 1;
+ reversed = true;
+ }
+
+ std::string buf;
+ PutFixed64(&buf, start);
+ std::string start_key = InternalKey(ns_key, buf, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+ std::string prefix = InternalKey(ns_key, "", metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+ std::string next_version_prefix = InternalKey(ns_key, "", metadata.version +
1, storage_->IsSlotIdEncoded()).Encode();
+
+ rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
+ LatestSnapShot ss(storage_);
+ read_options.snapshot = ss.GetSnapShot();
+ rocksdb::Slice upper_bound(next_version_prefix);
+ read_options.iterate_upper_bound = &upper_bound;
+ rocksdb::Slice lower_bound(prefix);
+ read_options.iterate_lower_bound = &lower_bound;
+
+ auto list_len = static_cast<int64_t>(metadata.size);
+ int64_t max_len = spec.max_len;
+ int64_t count = spec.count.value_or(-1);
+ int64_t offset = 0, matches = 0;
+
+ auto iter = util::UniqueIterator(storage_, read_options);
+ iter->Seek(start_key);
+ while (iter->Valid() && iter->key().starts_with(prefix) && (max_len == 0 ||
offset < max_len)) {
+ if (iter->value() == elem) {
+ matches++;
+ if (matches >= rank) {
+ int64_t pos = !reversed ? offset : list_len - offset - 1;
+ indexes->push_back(pos);
+ if (count != 0 && matches - rank + 1 >= count) {
+ break;
+ }
+ }
+ }
+ offset++;
+ !reversed ? iter->Next() : iter->Prev();
+ }
+ return rocksdb::Status::OK();
+}
+
rocksdb::Status List::Set(const Slice &user_key, int index, Slice elem) {
std::string ns_key = AppendNamespacePrefix(user_key);
diff --git a/src/types/redis_list.h b/src/types/redis_list.h
index 2cea147d..cf0effd8 100644
--- a/src/types/redis_list.h
+++ b/src/types/redis_list.h
@@ -22,6 +22,7 @@
#include <stdint.h>
+#include <optional>
#include <string>
#include <vector>
@@ -29,6 +30,13 @@
#include "storage/redis_db.h"
#include "storage/redis_metadata.h"
+struct PosSpec {
+ int64_t rank = 1;
+ std::optional<int64_t> count = std::nullopt;
+ int64_t max_len = 0;
+ explicit PosSpec() = default;
+};
+
namespace redis {
class List : public Database {
public:
@@ -45,6 +53,7 @@ class List : public Database {
rocksdb::Status Push(const Slice &user_key, const std::vector<Slice> &elems,
bool left, uint64_t *new_size);
rocksdb::Status PushX(const Slice &user_key, const std::vector<Slice>
&elems, bool left, uint64_t *new_size);
rocksdb::Status Range(const Slice &user_key, int start, int stop,
std::vector<std::string> *elems);
+ rocksdb::Status Pos(const Slice &user_key, const Slice &elem, const PosSpec
&spec, std::vector<int64_t> *indexes);
private:
rocksdb::Status GetMetadata(const Slice &ns_key, ListMetadata *metadata);
diff --git a/tests/cppunit/types/list_test.cc b/tests/cppunit/types/list_test.cc
index c723621c..fcb23aa5 100644
--- a/tests/cppunit/types/list_test.cc
+++ b/tests/cppunit/types/list_test.cc
@@ -31,14 +31,20 @@ class RedisListTest : public TestBase {
~RedisListTest() override = default;
void SetUp() override {
+ // Assume that `field_` is a matrix of size `m_` * `n_`,
+ // where every row is identical, and each element within a single row is
distinct.
key_ = "test-list-key";
fields_ = {"list-test-key-1", "list-test-key-2", "list-test-key-3",
"list-test-key-4", "list-test-key-5",
"list-test-key-1", "list-test-key-2", "list-test-key-3",
"list-test-key-4", "list-test-key-5",
"list-test-key-1", "list-test-key-2", "list-test-key-3",
"list-test-key-4", "list-test-key-5",
"list-test-key-1", "list-test-key-2", "list-test-key-3",
"list-test-key-4", "list-test-key-5"};
+ m_ = 4;
+ n_ = 5;
}
std::unique_ptr<redis::List> list_;
+ int n_;
+ int m_;
};
class RedisListSpecificTest : public RedisListTest {
@@ -162,6 +168,89 @@ TEST_F(RedisListTest, Range) {
list_->Del(key_);
}
+TEST_F(RedisListTest, Pos) {
+ uint64_t ret = 0;
+ list_->Push(key_, fields_, false, &ret);
+ EXPECT_EQ(fields_.size(), ret);
+
+ // Basic usage
+ PosSpec spec;
+ std::vector<int64_t> indexes;
+ list_->Pos(key_, fields_[0], spec, &indexes);
+ EXPECT_EQ(1, indexes.size());
+ EXPECT_EQ(0, indexes[0]);
+ list_->Pos(key_, fields_[2], spec, &indexes);
+ EXPECT_EQ(1, indexes.size());
+ EXPECT_EQ(2, indexes[0]);
+
+ // RANK option
+ spec = PosSpec{};
+ spec.rank = m_ + 1;
+ auto s = list_->Pos(key_, fields_[3], spec, &indexes);
+ EXPECT_TRUE(s.ok());
+ EXPECT_TRUE(indexes.empty());
+ spec.rank = -(m_ + 1);
+ s = list_->Pos(key_, fields_[3], spec, &indexes);
+ EXPECT_TRUE(s.ok());
+ EXPECT_TRUE(indexes.empty());
+ // positive
+ for (int i = 1; i <= m_; ++i) {
+ spec.rank = i;
+ list_->Pos(key_, fields_[3], spec, &indexes);
+ EXPECT_EQ(1, indexes.size());
+ EXPECT_EQ(n_ * (i - 1) + 3, indexes[0]);
+ }
+ // negative
+ for (int i = 1; i <= m_; ++i) {
+ spec.rank = -i;
+ list_->Pos(key_, fields_[3], spec, &indexes);
+ EXPECT_EQ(1, indexes.size());
+ EXPECT_EQ(fields_.size() - n_ * i + 3, indexes[0]);
+ }
+
+ // COUNT option
+ spec = PosSpec{};
+ spec.count = 0;
+ list_->Pos(key_, fields_[3], spec, &indexes);
+ EXPECT_EQ(m_, indexes.size());
+ EXPECT_EQ(3, indexes[0]);
+ spec.count = 2;
+ list_->Pos(key_, fields_[3], spec, &indexes);
+ EXPECT_EQ(2, indexes.size());
+ EXPECT_EQ(3, indexes[0]);
+ spec.count = 100;
+ list_->Pos(key_, fields_[3], spec, &indexes);
+ EXPECT_EQ(m_, indexes.size());
+ EXPECT_EQ(3, indexes[0]);
+
+ // COUNT + RANK option
+ spec = PosSpec{};
+ spec.count = 0;
+ spec.rank = 2;
+ list_->Pos(key_, fields_[3], spec, &indexes);
+ EXPECT_EQ(n_ - 2, indexes.size());
+ spec.count = 2;
+ spec.rank = -2;
+ list_->Pos(key_, fields_[3], spec, &indexes);
+ EXPECT_EQ(2, indexes.size());
+
+ // MAXLEN option
+ PosSpec maxlen_spec;
+ maxlen_spec.max_len = 2;
+ list_->Pos(key_, fields_[2], maxlen_spec, &indexes);
+ EXPECT_TRUE(indexes.empty());
+ list_->Pos(key_, fields_[1], maxlen_spec, &indexes);
+ EXPECT_EQ(1, indexes.size());
+ EXPECT_EQ(1, indexes[0]);
+ maxlen_spec.count = 0;
+ maxlen_spec.max_len = (n_ * 2);
+ list_->Pos(key_, fields_[3], maxlen_spec, &indexes);
+ EXPECT_EQ(2, indexes.size());
+ EXPECT_EQ(3, indexes[0]);
+
+ list_->Del(key_);
+}
+
TEST_F(RedisListTest, Rem) {
uint64_t ret = 0;
uint64_t len = 0;
diff --git a/tests/gocase/unit/type/list/list_test.go
b/tests/gocase/unit/type/list/list_test.go
index 05199997..2cf2e317 100644
--- a/tests/gocase/unit/type/list/list_test.go
+++ b/tests/gocase/unit/type/list/list_test.go
@@ -941,4 +941,58 @@ func TestList(t *testing.T) {
rd.MustRead(t, "$3")
require.Equal(t, "bar", rdb.LRange(ctx, "target", 0,
-1).Val()[0])
})
+
+ for listType, large := range largeValue {
+ t.Run(fmt.Sprintf("LPOS basic usage - %s", listType), func(t
*testing.T) {
+ createList("mylist", []string{"a", "b", "c", large,
"2", "3", "c", "c"})
+ require.Equal(t, int64(0), rdb.LPos(ctx, "mylist", "a",
redis.LPosArgs{}).Val())
+ require.Equal(t, int64(2), rdb.LPos(ctx, "mylist", "c",
redis.LPosArgs{}).Val())
+ })
+
+ t.Run("LPOS RANK option", func(t *testing.T) {
+ require.Equal(t, int64(2), rdb.LPos(ctx, "mylist", "c",
redis.LPosArgs{Rank: 1}).Val())
+ require.Equal(t, int64(6), rdb.LPos(ctx, "mylist", "c",
redis.LPosArgs{Rank: 2}).Val())
+ require.Error(t, rdb.LPos(ctx, "mylist", "c",
redis.LPosArgs{Rank: 4}).Err())
+ require.Equal(t, int64(7), rdb.LPos(ctx, "mylist", "c",
redis.LPosArgs{Rank: -1}).Val())
+ require.Equal(t, int64(6), rdb.LPos(ctx, "mylist", "c",
redis.LPosArgs{Rank: -2}).Val())
+ err := rdb.Do(ctx, "LPOS", "mylist", "c", "RANK",
0).Err()
+ require.Error(t, err)
+ require.True(t, strings.Contains(err.Error(), "RANK
can't be zero"))
+ })
+
+ t.Run("LPOS COUNT option", func(t *testing.T) {
+ require.Equal(t, []int64{2, 6, 7}, rdb.LPosCount(ctx,
"mylist", "c", 0, redis.LPosArgs{}).Val())
+ require.Equal(t, []int64{2}, rdb.LPosCount(ctx,
"mylist", "c", 1, redis.LPosArgs{}).Val())
+ require.Equal(t, []int64{2, 6}, rdb.LPosCount(ctx,
"mylist", "c", 2, redis.LPosArgs{}).Val())
+ require.Equal(t, []int64{2, 6, 7}, rdb.LPosCount(ctx,
"mylist", "c", 100, redis.LPosArgs{}).Val())
+ })
+
+ t.Run("LPOS COUNT + RANK option", func(t *testing.T) {
+ require.Equal(t, []int64{6, 7}, rdb.LPosCount(ctx,
"mylist", "c", 0, redis.LPosArgs{Rank: 2}).Val())
+ require.Equal(t, []int64{7, 6}, rdb.LPosCount(ctx,
"mylist", "c", 2, redis.LPosArgs{Rank: -1}).Val())
+ })
+
+ t.Run("LPOS non existing key", func(t *testing.T) {
+ require.Empty(t, rdb.LPosCount(ctx, "mylistxxx", "c",
0, redis.LPosArgs{Rank: 2}).Val())
+ })
+
+ t.Run("LPOS no match", func(t *testing.T) {
+ require.Empty(t, rdb.LPosCount(ctx, "mylist", "x", 2,
redis.LPosArgs{Rank: -1}).Val())
+ require.Empty(t, rdb.LPos(ctx, "mylist", "x",
redis.LPosArgs{Rank: -1}).Val())
+ })
+
+ t.Run("LPOS MAXLEN", func(t *testing.T) {
+ require.Equal(t, []int64{0}, rdb.LPosCount(ctx,
"mylist", "a", 0, redis.LPosArgs{MaxLen: 1}).Val())
+ require.Empty(t, rdb.LPosCount(ctx, "mylist", "c", 0,
redis.LPosArgs{MaxLen: 1}).Val())
+ require.Equal(t, []int64{2}, rdb.LPosCount(ctx,
"mylist", "c", 0, redis.LPosArgs{MaxLen: 3}).Val())
+ require.Equal(t, []int64{7, 6}, rdb.LPosCount(ctx,
"mylist", "c", 0, redis.LPosArgs{MaxLen: 3, Rank: -1}).Val())
+ require.Equal(t, []int64{6}, rdb.LPosCount(ctx,
"mylist", "c", 0, redis.LPosArgs{MaxLen: 7, Rank: 2}).Val())
+ })
+
+ t.Run("LPOS when RANK is greater than matches", func(t
*testing.T) {
+ require.NoError(t, rdb.Del(ctx, "mylist").Err())
+ require.NoError(t, rdb.LPush(ctx, "mylist", "a").Err())
+ require.Empty(t, rdb.LPosCount(ctx, "mylist", "b", 10,
redis.LPosArgs{Rank: 5}).Val())
+ })
+ }
}