This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new a0dae44d Add support of the command XCLAIM (#2202)
a0dae44d is described below
commit a0dae44d2f2cbf37fdeecbffcb3e16b7b2c3ff12
Author: Rebecca Zhou <[email protected]>
AuthorDate: Mon May 13 05:53:28 2024 -0400
Add support of the command XCLAIM (#2202)
Co-authored-by: 纪华裕 <[email protected]>
---
src/commands/cmd_stream.cc | 123 +++++++++++++++++++
src/types/redis_stream.cc | 132 +++++++++++++++++++++
src/types/redis_stream.h | 4 +
src/types/redis_stream_base.h | 17 +++
tests/gocase/unit/type/stream/stream_test.go | 171 +++++++++++++++++++++++++++
5 files changed, 447 insertions(+)
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index a5fdf795..8468ee30 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -242,6 +242,128 @@ class CommandXDel : public Commander {
std::vector<redis::StreamEntryID> ids_;
};
+class CommandXClaim : public Commander {
+ public:
+ Status Parse(const std::vector<std::string> &args) override {
+ if (args.size() < 6) {
+ return {Status::RedisParseErr, errWrongNumOfArguments};
+ }
+
+ CommandParser parser(args, 1);
+ stream_name_ = GET_OR_RET(parser.TakeStr());
+ group_name_ = GET_OR_RET(parser.TakeStr());
+ consumer_name_ = GET_OR_RET(parser.TakeStr());
+ auto parse_result = parser.TakeInt<int64_t>();
+ if (!parse_result.IsOK()) {
+ return {Status::RedisParseErr, errValueNotInteger};
+ }
+ min_idle_time_ms_ = parse_result.GetValue();
+ if (min_idle_time_ms_ < 0) {
+ min_idle_time_ms_ = 0;
+ }
+
+ while (parser.Good() && !isOption(parser.RawPeek())) {
+ auto raw_id = GET_OR_RET(parser.TakeStr());
+ redis::StreamEntryID id;
+ auto s = ParseStreamEntryID(raw_id, &id);
+ if (!s.IsOK()) {
+ return s;
+ }
+ entry_ids_.emplace_back(id);
+ }
+
+ while (parser.Good()) {
+ if (parser.EatEqICase("idle")) {
+ auto parse_result = parser.TakeInt<int64_t>();
+ if (!parse_result.IsOK()) {
+ return {Status::RedisParseErr, errValueNotInteger};
+ }
+ if (parse_result.GetValue() < 0) {
+ return {Status::RedisParseErr, "IDLE for XCLAIM must be
non-negative"};
+ }
+ stream_claim_options_.idle_time_ms = parse_result.GetValue();
+ } else if (parser.EatEqICase("time")) {
+ auto parse_result = parser.TakeInt<int64_t>();
+ if (!parse_result.IsOK()) {
+ return {Status::RedisParseErr, errValueNotInteger};
+ }
+ if (parse_result.GetValue() < 0) {
+ return {Status::RedisParseErr, "TIME for XCLAIM must be
non-negative"};
+ }
+ stream_claim_options_.with_time = true;
+ stream_claim_options_.last_delivery_time_ms = parse_result.GetValue();
+ } else if (parser.EatEqICase("retrycount")) {
+ auto parse_result = parser.TakeInt<int64_t>();
+ if (!parse_result.IsOK()) {
+ return {Status::RedisParseErr, errValueNotInteger};
+ }
+ if (parse_result.GetValue() < 0) {
+ return {Status::RedisParseErr, "RETRYCOUNT for XCLAIM must be
non-negative"};
+ }
+ stream_claim_options_.with_retry_count = true;
+ stream_claim_options_.last_delivery_count = parse_result.GetValue();
+ } else if (parser.EatEqICase("force")) {
+ stream_claim_options_.force = true;
+ } else if (parser.EatEqICase("justid")) {
+ stream_claim_options_.just_id = true;
+ } else if (parser.EatEqICase("lastid")) {
+ auto last_id = GET_OR_RET(parser.TakeStr());
+ auto s = ParseStreamEntryID(last_id,
&stream_claim_options_.last_delivered_id);
+ if (!s.IsOK()) {
+ return s;
+ }
+ } else {
+ return parser.InvalidSyntax();
+ }
+ }
+ return Status::OK();
+ }
+
+ Status Execute(Server *srv, Connection *conn, std::string *output) override {
+ redis::Stream stream_db(srv->storage, conn->GetNamespace());
+ StreamClaimResult result;
+ auto s = stream_db.ClaimPelEntries(stream_name_, group_name_,
consumer_name_, min_idle_time_ms_, entry_ids_,
+ stream_claim_options_, &result);
+ if (!s.ok()) {
+ return {Status::RedisExecErr, s.ToString()};
+ }
+
+ if (s.IsNotFound()) {
+ return {Status::RedisExecErr, errNoSuchKey};
+ }
+
+ if (!stream_claim_options_.just_id) {
+ output->append(redis::MultiLen(result.entries.size()));
+
+ for (const auto &e : result.entries) {
+ output->append(redis::MultiLen(2));
+ output->append(redis::BulkString(e.key));
+ output->append(conn->MultiBulkString(e.values));
+ }
+ } else {
+ output->append(redis::MultiLen(result.ids.size()));
+ for (const auto &id : result.ids) {
+ output->append(redis::BulkString(id));
+ }
+ }
+
+ return Status::OK();
+ }
+
+ private:
+ std::string stream_name_;
+ std::string group_name_;
+ std::string consumer_name_;
+ uint64_t min_idle_time_ms_;
+ std::vector<StreamEntryID> entry_ids_;
+ StreamClaimOptions stream_claim_options_;
+
+ bool static isOption(const std::string &arg) {
+ static const std::unordered_set<std::string> options = {"idle", "time",
"retrycount", "force", "justid", "lastid"};
+ return options.find(util::ToLower(arg)) != options.end();
+ }
+};
+
class CommandXGroup : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
@@ -1534,6 +1656,7 @@ class CommandXSetId : public Commander {
REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandXAck>("xack", -4, "write
no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXAdd>("xadd", -5, "write", 1, 1, 1),
MakeCmdAttr<CommandXDel>("xdel", -3, "write
no-dbsize-check", 1, 1, 1),
+ MakeCmdAttr<CommandXClaim>("xclaim", -6, "write", 1,
1, 1),
MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2,
2, 1),
MakeCmdAttr<CommandXLen>("xlen", -2, "read-only", 1,
1, 1),
MakeCmdAttr<CommandXInfo>("xinfo", -2, "read-only", 0,
0, 0),
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 6c66f800..a3bdc63b 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -357,6 +357,138 @@ rocksdb::Status Stream::DeletePelEntries(const Slice
&stream_name, const std::st
return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
+rocksdb::Status Stream::ClaimPelEntries(const Slice &stream_name, const
std::string &group_name,
+ const std::string &consumer_name,
const uint64_t min_idle_time_ms,
+ const std::vector<StreamEntryID>
&entry_ids, const StreamClaimOptions &options,
+ StreamClaimResult *result) {
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+ LockGuard guard(storage_->GetLockManager(), ns_key);
+ StreamMetadata metadata(false);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
+ if (!s.ok()) return s;
+
+ std::string group_key = internalKeyFromGroupName(ns_key, metadata,
group_name);
+ std::string get_group_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key,
&get_group_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group "
+ group_name + " for key name " +
+ stream_name.ToString());
+ }
+ StreamConsumerGroupMetadata group_metadata =
decodeStreamConsumerGroupMetadataValue(get_group_value);
+
+ std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata,
group_name, consumer_name);
+ std::string get_consumer_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key,
&get_consumer_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ int created_number = 0;
+ s = createConsumerWithoutLock(stream_name, group_name, consumer_name,
&created_number);
+ if (!s.ok()) {
+ return s;
+ }
+ group_metadata.consumer_number += created_number;
+ }
+ StreamConsumerMetadata consumer_metadata;
+ if (!s.IsNotFound()) {
+ consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
+ }
+ auto now = util::GetTimeStampMS();
+ consumer_metadata.last_idle_ms = now;
+ consumer_metadata.last_active_ms = now;
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisStream);
+ batch->PutLogData(log_data.Encode());
+
+ for (const auto &id : entry_ids) {
+ std::string raw_value;
+ rocksdb::Status s = getEntryRawValue(ns_key, metadata, id, &raw_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) continue;
+
+ std::string entry_key = internalPelKeyFromGroupAndEntryId(ns_key,
metadata, group_name, id);
+ std::string value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key,
&value);
+ StreamPelEntry pel_entry;
+
+ if (!s.ok() && s.IsNotFound() && options.force) {
+ pel_entry = {0, 0, ""};
+ group_metadata.pending_number += 1;
+ }
+
+ if (s.ok()) {
+ pel_entry = decodeStreamPelEntryValue(value);
+ }
+
+ if (s.ok() || (s.IsNotFound() && options.force)) {
+ if (now - pel_entry.last_delivery_time_ms < min_idle_time_ms) continue;
+
+ std::vector<std::string> values;
+ if (options.just_id) {
+ result->ids.emplace_back(id.ToString());
+ } else {
+ auto rv = DecodeRawStreamEntryValue(raw_value, &values);
+ if (!rv.IsOK()) {
+ return rocksdb::Status::InvalidArgument(rv.Msg());
+ }
+ result->entries.emplace_back(id.ToString(), std::move(values));
+ }
+
+ if (pel_entry.consumer_name != "") {
+ std::string original_consumer_key =
+ internalKeyFromConsumerName(ns_key, metadata, group_name,
pel_entry.consumer_name);
+ std::string get_original_consumer_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_,
original_consumer_key,
+ &get_original_consumer_value);
+ if (!s.ok()) {
+ return s;
+ }
+ StreamConsumerMetadata original_consumer_metadata =
+ decodeStreamConsumerMetadataValue(get_original_consumer_value);
+ original_consumer_metadata.pending_number -= 1;
+ batch->Put(stream_cf_handle_, original_consumer_key,
+
encodeStreamConsumerMetadataValue(original_consumer_metadata));
+ }
+
+ pel_entry.consumer_name = consumer_name;
+ consumer_metadata.pending_number += 1;
+ if (options.with_time) {
+ pel_entry.last_delivery_time_ms = options.last_delivery_time_ms;
+ } else {
+ pel_entry.last_delivery_time_ms = now - options.idle_time_ms;
+ }
+
+ if (pel_entry.last_delivery_time_ms < 0 ||
pel_entry.last_delivery_time_ms > now) {
+ pel_entry.last_delivery_time_ms = now;
+ }
+
+ if (options.with_retry_count) {
+ pel_entry.last_delivery_count = options.last_delivery_count;
+ } else if (!options.just_id) {
+ pel_entry.last_delivery_count += 1;
+ }
+
+ std::string pel_value = encodeStreamPelEntryValue(pel_entry);
+ batch->Put(stream_cf_handle_, entry_key, pel_value);
+ }
+ }
+
+ if (options.with_last_id && options.last_delivered_id >
group_metadata.last_delivered_id) {
+ group_metadata.last_delivered_id = options.last_delivered_id;
+ }
+
+ batch->Put(stream_cf_handle_, consumer_key,
encodeStreamConsumerMetadataValue(consumer_metadata));
+ batch->Put(stream_cf_handle_, group_key,
encodeStreamConsumerGroupMetadataValue(group_metadata));
+ return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+}
+
rocksdb::Status Stream::CreateGroup(const Slice &stream_name, const
StreamXGroupCreateOptions &options,
const std::string &group_name) {
if (std::isdigit(group_name[0])) {
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index 662f93f7..2e867a85 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -51,6 +51,10 @@ class Stream : public SubKeyScanner {
rocksdb::Status DeleteEntries(const Slice &stream_name, const
std::vector<StreamEntryID> &ids, uint64_t *deleted_cnt);
rocksdb::Status DeletePelEntries(const Slice &stream_name, const std::string
&group_name,
const std::vector<StreamEntryID>
&entry_ids, uint64_t *acknowledged);
+ rocksdb::Status ClaimPelEntries(const Slice &stream_name, const std::string
&group_name,
+ const std::string &consumer_name, uint64_t
min_idle_time_ms,
+ const std::vector<StreamEntryID> &entry_ids,
const StreamClaimOptions &options,
+ StreamClaimResult *result);
rocksdb::Status Len(const Slice &stream_name, const StreamLenOptions
&options, uint64_t *size);
rocksdb::Status GetStreamInfo(const Slice &stream_name, bool full, uint64_t
count, StreamInfo *info);
rocksdb::Status GetGroupInfo(const Slice &stream_name,
diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h
index ae90a3e5..efea48ec 100644
--- a/src/types/redis_stream_base.h
+++ b/src/types/redis_stream_base.h
@@ -161,6 +161,18 @@ struct StreamXGroupCreateOptions {
std::string last_id;
};
+struct StreamClaimOptions {
+ uint64_t idle_time_ms = 0;
+ bool with_time = false;
+ bool with_retry_count = false;
+ bool force = false;
+ bool just_id = false;
+ bool with_last_id = false;
+ uint64_t last_delivery_time_ms;
+ uint64_t last_delivery_count;
+ StreamEntryID last_delivered_id;
+};
+
struct StreamConsumerGroupMetadata {
uint64_t consumer_number = 0;
uint64_t pending_number = 0;
@@ -207,6 +219,11 @@ struct StreamReadResult {
: name(std::move(name)), entries(std::move(result)) {}
};
+struct StreamClaimResult {
+ std::vector<std::string> ids;
+ std::vector<StreamEntry> entries;
+};
+
Status IncrementStreamEntryID(StreamEntryID *id);
Status ParseStreamEntryID(const std::string &input, StreamEntryID *id);
StatusOr<std::unique_ptr<NextStreamEntryIDGenerationStrategy>>
ParseNextStreamEntryIDStrategy(const std::string &input);
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index ad6189db..d6a09a9e 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -1294,6 +1294,177 @@ func TestStreamOffset(t *testing.T) {
require.NoError(t, err)
require.Equal(t, int64(3), r)
})
+
+ t.Run("Simple XCLAIM command tests", func(t *testing.T) {
+ streamName := "mystream"
+ groupName := "mygroup"
+ consumerName := "myconsumer"
+ consumer1Name := "myconsumer1"
+ require.NoError(t, rdb.Del(ctx, streamName).Err())
+
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "1-0",
+ Values: []string{"field1", "data1"},
+ }).Err())
+ require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+ r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName,
+ Streams: []string{streamName, ">"},
+ Count: 1,
+ NoAck: false,
+ }).Result()
+ require.NoError(t, err)
+ require.Equal(t, []redis.XStream{{
+ Stream: streamName,
+ Messages: []redis.XMessage{{ID: "1-0", Values:
map[string]interface{}{"field1": "data1"}}},
+ }}, r)
+
+ claimedMessages, err := rdb.XClaim(ctx, &redis.XClaimArgs{
+ Stream: streamName,
+ Group: groupName,
+ Consumer: consumer1Name,
+ MinIdle: 0,
+ Messages: []string{"1-0"},
+ }).Result()
+ require.NoError(t, err)
+ require.Len(t, claimedMessages, 1, "Expected to claim 1
message")
+ require.Equal(t, "1-0", claimedMessages[0].ID, "Expected
claimed message ID to match")
+
+ time.Sleep(2000 * time.Millisecond)
+ minIdleTime := 1000 * time.Millisecond
+ claimedMessages, err = rdb.XClaim(ctx, &redis.XClaimArgs{
+ Stream: streamName,
+ Group: groupName,
+ Consumer: consumerName,
+ MinIdle: minIdleTime,
+ Messages: []string{"1-0"},
+ }).Result()
+ require.NoError(t, err)
+ require.Len(t, claimedMessages, 1, "Expected to claim 1 message
if idle time is large enough")
+ require.Equal(t, "1-0", claimedMessages[0].ID, "Expected
claimed message ID to match")
+
+ minIdleTime = 60000 * time.Millisecond
+ claimedMessages, err = rdb.XClaim(ctx, &redis.XClaimArgs{
+ Stream: streamName,
+ Group: groupName,
+ Consumer: consumer1Name,
+ MinIdle: minIdleTime,
+ Messages: []string{"1-0"},
+ }).Result()
+
+ require.NoError(t, err)
+ require.Empty(t, claimedMessages, "Expected no messages to be
claimed due to insufficient idle time")
+ })
+
+ t.Run("XCLAIM with different timing situations and options", func(t
*testing.T) {
+ streamName := "mystream"
+ groupName := "mygroup"
+ consumerName := "myconsumer"
+ consumer1Name := "myconsumer1"
+ require.NoError(t, rdb.Del(ctx, streamName).Err())
+
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "1-0",
+ Values: []string{"field1", "data1"},
+ }).Err())
+ require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+ r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName,
+ Streams: []string{streamName, ">"},
+ Count: 1,
+ NoAck: false,
+ }).Result()
+ require.NoError(t, err)
+ require.Equal(t, []redis.XStream{{
+ Stream: streamName,
+ Messages: []redis.XMessage{{ID: "1-0", Values:
map[string]interface{}{"field1": "data1"}}},
+ }}, r)
+
+ rawClaimedMessages, err := rdb.Do(ctx, "XCLAIM", streamName,
groupName, consumer1Name, "0", "1-0", "IDLE", "5000").Result()
+ require.NoError(t, err)
+ messages, ok := rawClaimedMessages.([]interface{})
+ require.True(t, ok, "Expected the result to be a slice of
interface{}")
+ firstMsg, ok := messages[0].([]interface{})
+ require.True(t, ok, "Expected message details to be a slice of
interface{}")
+ msgID, ok := firstMsg[0].(string)
+ require.True(t, ok, "Expected message ID to be a string")
+ require.Equal(t, "1-0", msgID, "Expected claimed message ID to
match")
+
+ claimedMessages, err := rdb.XClaim(ctx, &redis.XClaimArgs{
+ Stream: streamName,
+ Group: groupName,
+ Consumer: consumerName,
+ MinIdle: 2000 * time.Millisecond,
+ Messages: []string{"1-0"},
+ }).Result()
+ require.NoError(t, err)
+ require.Len(t, claimedMessages, 1, "Expected to claim 1 message
if idle time is large enough")
+ require.Equal(t, "1-0", claimedMessages[0].ID, "Expected
claimed message ID to match")
+
+ tenSecondsAgo := time.Now().Add(-10 * time.Second).UnixMilli()
+ rawClaimedMessages, err = rdb.Do(ctx, "XCLAIM", streamName,
groupName, consumer1Name, "0", "1-0", "TIME", tenSecondsAgo).Result()
+ require.NoError(t, err)
+ messages, ok = rawClaimedMessages.([]interface{})
+ require.True(t, ok, "Expected the result to be a slice of
interface{}")
+ firstMsg, ok = messages[0].([]interface{})
+ require.True(t, ok, "Expected message details to be a slice of
interface{}")
+ msgID, ok = firstMsg[0].(string)
+ require.True(t, ok, "Expected message ID to be a string")
+ require.Equal(t, "1-0", msgID, "Expected claimed message ID to
match")
+
+ claimedMessages, err = rdb.XClaim(ctx, &redis.XClaimArgs{
+ Stream: streamName,
+ Group: groupName,
+ Consumer: consumerName,
+ MinIdle: 5000 * time.Millisecond,
+ Messages: []string{"1-0"},
+ }).Result()
+ require.NoError(t, err)
+ require.Len(t, claimedMessages, 1, "Expected to claim 1 message
if idle time is large enough")
+ require.Equal(t, "1-0", claimedMessages[0].ID, "Expected
claimed message ID to match")
+ })
+
+ t.Run("XCLAIM command with different options", func(t *testing.T) {
+ streamName := "mystream"
+ groupName := "mygroup"
+ consumerName := "myconsumer"
+ consumer1Name := "myconsumer1"
+
+ require.NoError(t, rdb.Del(ctx, streamName).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "1-0",
+ Values: []string{"field1", "data1"},
+ }).Err())
+ require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+
+ rawClaimedMessages, err := rdb.Do(ctx, "XCLAIM", streamName,
groupName, consumerName, "0", "1-0", "FORCE").Result()
+ require.NoError(t, err)
+ messages, ok := rawClaimedMessages.([]interface{})
+ require.True(t, ok, "Expected the result to be a slice of
interface{}")
+ firstMsg, ok := messages[0].([]interface{})
+ require.True(t, ok, "Expected message details to be a slice of
interface{}")
+ msgID, ok := firstMsg[0].(string)
+ require.True(t, ok, "Expected message ID to be a string")
+ require.Equal(t, "1-0", msgID, "Expected claimed message ID to
match")
+
+ cmd := rdb.XClaimJustID(ctx, &redis.XClaimArgs{
+ Stream: streamName,
+ Group: groupName,
+ Consumer: consumer1Name,
+ MinIdle: 0,
+ Messages: []string{"1-0"},
+ })
+
+ claimedIDs, err := cmd.Result()
+ require.NoError(t, err)
+ require.Len(t, claimedIDs, 1, "Expected to claim exactly one
message ID")
+ require.Equal(t, "1-0", claimedIDs[0], "Expected claimed
message ID to match")
+ })
}
func parseStreamEntryID(id string) (ts int64, seqNum int64) {