This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new f03d48ca feat(stream): add support of the `XAUTOCLAIM` command (#2373)
f03d48ca is described below
commit f03d48caab79aba468957ccc782e47989960e609
Author: Edward Xu <[email protected]>
AuthorDate: Thu Jun 27 18:41:42 2024 +0800
feat(stream): add support of the `XAUTOCLAIM` command (#2373)
---
src/commands/cmd_stream.cc | 94 +++++++
src/types/redis_stream.cc | 160 +++++++++++
src/types/redis_stream.h | 2 +
src/types/redis_stream_base.h | 15 +
tests/gocase/unit/type/stream/stream_test.go | 404 +++++++++++++++++++++++++++
5 files changed, 675 insertions(+)
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index ba4f7d98..7faeb9c4 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -18,6 +18,8 @@
*
*/
+#include <algorithm>
+#include <limits>
#include <memory>
#include <stdexcept>
@@ -358,6 +360,97 @@ class CommandXClaim : public Commander {
}
};
+class CommandAutoClaim : public Commander {
+ public:
+ Status Parse(const std::vector<std::string> &args) override {
+ CommandParser parser(args, 1);
+ key_name_ = GET_OR_RET(parser.TakeStr());
+ group_name_ = GET_OR_RET(parser.TakeStr());
+ consumer_name_ = GET_OR_RET(parser.TakeStr());
+ if (auto parse_status = parser.TakeInt<uint64_t>(); !parse_status.IsOK()) {
+ return {Status::RedisParseErr, "Invalid min-idle-time argument for
XAUTOCLAIM"};
+ } else {
+ options_.min_idle_time_ms = parse_status.GetValue();
+ }
+
+ auto start_str = GET_OR_RET(parser.TakeStr());
+ if (!start_str.empty() && start_str.front() == '(') {
+ options_.exclude_start = true;
+ start_str = start_str.substr(1);
+ }
+ if (!options_.exclude_start && start_str == "-") {
+ options_.start_id = StreamEntryID::Minimum();
+ } else {
+ auto parse_status = ParseRangeStart(start_str, &options_.start_id);
+ if (!parse_status.IsOK()) {
+ return parse_status;
+ }
+ }
+
+ if (parser.EatEqICase("count")) {
+ uint64_t count = GET_OR_RET(parser.TakeInt<uint64_t>());
+ constexpr uint64_t min_count = 1;
+ uint64_t max_count = std::numeric_limits<int64_t>::max() /
+
(std::max(static_cast<uint64_t>(sizeof(StreamEntryID)),
options_.attempts_factors));
+ if (count < min_count || count > max_count) {
+ return {Status::RedisParseErr, "COUNT must be > 0"};
+ }
+ options_.count = count;
+ }
+
+ if (parser.Good() && parser.EatEqICase("justid")) {
+ options_.just_id = true;
+ }
+
+ return Status::OK();
+ }
+
+ Status Execute(Server *srv, Connection *conn, std::string *output) override {
+ redis::Stream stream_db(srv->storage, conn->GetNamespace());
+ StreamAutoClaimResult result;
+ auto s = stream_db.AutoClaim(key_name_, group_name_, consumer_name_,
options_, &result);
+ if (!s.ok()) {
+ if (s.IsNotFound()) {
+ return {Status::RedisExecErr,
+ "NOGROUP No such key '" + key_name_ + "' or consumer group '"
+ group_name_ + "'"};
+ }
+ return {Status::RedisExecErr, s.ToString()};
+ }
+ return sendResults(conn, result, output);
+ }
+
+ private:
+ Status sendResults(Connection *conn, const StreamAutoClaimResult &result,
std::string *output) const {
+ output->append(redis::MultiLen(3));
+ output->append(redis::BulkString(result.next_claim_id));
+ output->append(redis::MultiLen(result.entries.size()));
+ for (const auto &item : result.entries) {
+ if (options_.just_id) {
+ output->append(redis::BulkString(item.key));
+ } else {
+ output->append(redis::MultiLen(2));
+ output->append(redis::BulkString(item.key));
+ output->append(redis::MultiLen(item.values.size()));
+ for (const auto &value_item : item.values) {
+ output->append(redis::BulkString(value_item));
+ }
+ }
+ }
+
+ output->append(redis::MultiLen(result.deleted_ids.size()));
+ for (const auto &item : result.deleted_ids) {
+ output->append(redis::BulkString(item));
+ }
+
+ return Status::OK();
+ }
+
+ std::string key_name_;
+ std::string group_name_;
+ std::string consumer_name_;
+ StreamAutoClaimOptions options_;
+};
+
class CommandXGroup : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
@@ -1647,6 +1740,7 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandXAck>("xack",
-4, "write no-dbsize-ch
MakeCmdAttr<CommandXAdd>("xadd", -5, "write", 1, 1, 1),
MakeCmdAttr<CommandXDel>("xdel", -3, "write
no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXClaim>("xclaim", -6, "write", 1,
1, 1),
+ MakeCmdAttr<CommandAutoClaim>("xautoclaim", -6,
"write", 1, 1, 1),
MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2,
2, 1),
MakeCmdAttr<CommandXLen>("xlen", -2, "read-only", 1,
1, 1),
MakeCmdAttr<CommandXInfo>("xinfo", -2, "read-only", 0,
0, 0),
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 8b624a2e..6bf03d34 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -511,6 +511,166 @@ rocksdb::Status Stream::ClaimPelEntries(const Slice
&stream_name, const std::str
return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
+rocksdb::Status Stream::AutoClaim(const Slice &stream_name, const std::string
&group_name,
+ const std::string &consumer_name, const
StreamAutoClaimOptions &options,
+ StreamAutoClaimResult *result) {
+ if (options.exclude_start && options.start_id.IsMaximum()) {
+ return rocksdb::Status::InvalidArgument("invalid start ID for the
interval");
+ }
+
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+ StreamMetadata metadata(false);
+
+ LockGuard guard(storage_->GetLockManager(), ns_key);
+ auto s = GetMetadata(GetOptions{}, ns_key, &metadata);
+ if (!s.ok()) { // not found will be caught by outside with no such key or
consumer group
+ return s;
+ }
+
+ std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata,
group_name, consumer_name);
+ std::string get_consumer_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key,
&get_consumer_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ int created_number = 0;
+ s = createConsumerWithoutLock(stream_name, group_name, consumer_name,
&created_number);
+ if (!s.ok()) {
+ return s;
+ }
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key,
&get_consumer_value);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ StreamConsumerMetadata current_consumer_metadata =
decodeStreamConsumerMetadataValue(get_consumer_value);
+ std::map<std::string, uint64_t> claimed_consumer_entity_count;
+ std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata,
group_name, options.start_id);
+ std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata,
group_name, StreamEntryID::Maximum());
+
+ LatestSnapShot ss{storage_};
+ rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
+ read_options.snapshot = ss.GetSnapShot();
+ rocksdb::Slice lower_bound(prefix_key);
+ rocksdb::Slice upper_bound(end_key);
+ read_options.iterate_lower_bound = &lower_bound;
+ read_options.iterate_upper_bound = &upper_bound;
+
+ auto count = options.count;
+ uint64_t attempts = options.attempts_factors * count;
+ auto now_ms = util::GetTimeStampMS();
+ std::vector<StreamEntryID> deleted_entries;
+ std::vector<StreamEntry> pending_entries;
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisStream);
+ batch->PutLogData(log_data.Encode());
+
+ auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
+ uint64_t total_claimed_count = 0;
+ for (iter->SeekToFirst(); iter->Valid() && count > 0 && attempts > 0;
iter->Next()) {
+ if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) {
+ std::string tmp_group_name;
+ StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(),
tmp_group_name);
+ if (tmp_group_name != group_name) {
+ continue;
+ }
+
+ if (options.exclude_start && entry_id == options.start_id) {
+ continue;
+ }
+
+ attempts--;
+
+ StreamPelEntry penl_entry =
decodeStreamPelEntryValue(iter->value().ToString());
+ if ((now_ms - penl_entry.last_delivery_time_ms) <
options.min_idle_time_ms) {
+ continue;
+ }
+
+ auto entry_key = internalKeyFromEntryID(ns_key, metadata, entry_id);
+ std::string entry_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key,
&entry_value);
+ if (!s.ok()) {
+ if (s.IsNotFound()) {
+ deleted_entries.push_back(entry_id);
+ batch->Delete(stream_cf_handle_, iter->key());
+ --count;
+ continue;
+ }
+ return s;
+ }
+
+ StreamEntry entry(entry_id.ToString(), {});
+ if (!options.just_id) {
+ auto rv_status = DecodeRawStreamEntryValue(entry_value, &entry.values);
+ if (!rv_status.OK()) {
+ return rocksdb::Status::InvalidArgument(rv_status.Msg());
+ }
+ }
+
+ pending_entries.emplace_back(std::move(entry));
+ --count;
+
+ if (penl_entry.consumer_name != consumer_name) {
+ ++total_claimed_count;
+ claimed_consumer_entity_count[penl_entry.consumer_name] += 1;
+ penl_entry.consumer_name = consumer_name;
+ penl_entry.last_delivery_time_ms = now_ms;
+ // Increment the delivery attempts counter unless JUSTID option
provided
+ if (!options.just_id) {
+ penl_entry.last_delivery_count += 1;
+ }
+ batch->Put(stream_cf_handle_, iter->key(),
encodeStreamPelEntryValue(penl_entry));
+ }
+ }
+ }
+
+ if (total_claimed_count > 0 && !pending_entries.empty()) {
+ current_consumer_metadata.pending_number += total_claimed_count;
+ current_consumer_metadata.last_attempted_interaction_ms = now_ms;
+
+ batch->Put(stream_cf_handle_, consumer_key,
encodeStreamConsumerMetadataValue(current_consumer_metadata));
+
+ for (const auto &[consumer, count] : claimed_consumer_entity_count) {
+ std::string tmp_consumer_key = internalKeyFromConsumerName(ns_key,
metadata, group_name, consumer);
+ std::string tmp_consumer_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_,
tmp_consumer_key, &tmp_consumer_value);
+ if (!s.ok()) {
+ return s;
+ }
+ StreamConsumerMetadata tmp_consumer_metadata =
decodeStreamConsumerMetadataValue(tmp_consumer_value);
+ tmp_consumer_metadata.pending_number -= count;
+ batch->Put(stream_cf_handle_, tmp_consumer_key,
encodeStreamConsumerMetadataValue(tmp_consumer_metadata));
+ }
+ }
+
+ bool has_next_entry = false;
+ for (; iter->Valid(); iter->Next()) {
+ if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) {
+ has_next_entry = true;
+ break;
+ }
+ }
+
+ if (has_next_entry) {
+ std::string tmp_group_name;
+ StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(),
tmp_group_name);
+ result->next_claim_id = entry_id.ToString();
+ } else {
+ result->next_claim_id = StreamEntryID::Minimum().ToString();
+ }
+
+ result->entries = std::move(pending_entries);
+ result->deleted_ids.clear();
+ result->deleted_ids.reserve(deleted_entries.size());
+ std::transform(deleted_entries.cbegin(), deleted_entries.cend(),
std::back_inserter(result->deleted_ids),
+ [](const StreamEntryID &id) { return id.ToString(); });
+
+ return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+}
+
rocksdb::Status Stream::CreateGroup(const Slice &stream_name, const
StreamXGroupCreateOptions &options,
const std::string &group_name) {
if (std::isdigit(group_name[0])) {
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index e0546ded..510cbb66 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -55,6 +55,8 @@ class Stream : public SubKeyScanner {
const std::string &consumer_name, uint64_t
min_idle_time_ms,
const std::vector<StreamEntryID> &entry_ids,
const StreamClaimOptions &options,
StreamClaimResult *result);
+ rocksdb::Status AutoClaim(const Slice &stream_name, const std::string
&group_name, const std::string &consumer_name,
+ const StreamAutoClaimOptions &options,
StreamAutoClaimResult *result);
rocksdb::Status Len(const Slice &stream_name, const StreamLenOptions
&options, uint64_t *size);
rocksdb::Status GetStreamInfo(const Slice &stream_name, bool full, uint64_t
count, StreamInfo *info);
rocksdb::Status GetGroupInfo(const Slice &stream_name,
diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h
index 889e4046..82c8f945 100644
--- a/src/types/redis_stream_base.h
+++ b/src/types/redis_stream_base.h
@@ -173,6 +173,15 @@ struct StreamClaimOptions {
StreamEntryID last_delivered_id;
};
+struct StreamAutoClaimOptions {
+ uint64_t min_idle_time_ms;
+ uint64_t count = 100;
+ uint64_t attempts_factors = 10;
+ StreamEntryID start_id;
+ bool just_id = false;
+ bool exclude_start = false;
+};
+
struct StreamConsumerGroupMetadata {
uint64_t consumer_number = 0;
uint64_t pending_number = 0;
@@ -224,6 +233,12 @@ struct StreamClaimResult {
std::vector<StreamEntry> entries;
};
+struct StreamAutoClaimResult {
+ std::string next_claim_id;
+ std::vector<StreamEntry> entries;
+ std::vector<std::string> deleted_ids;
+};
+
Status IncrementStreamEntryID(StreamEntryID *id);
Status ParseStreamEntryID(const std::string &input, StreamEntryID *id);
StatusOr<std::unique_ptr<NextStreamEntryIDGenerationStrategy>>
ParseNextStreamEntryIDStrategy(const std::string &input);
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index 7297bb66..b8958e70 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -1500,6 +1500,410 @@ func TestStreamOffset(t *testing.T) {
require.Len(t, claimedIDs, 1, "Expected to claim exactly one
message ID")
require.Equal(t, "1-0", claimedIDs[0], "Expected claimed
message ID to match")
})
+
+ t.Run("XAUTOCLAIM can claim PEL items from another consume", func(t
*testing.T) {
+
+ streamName := "mystream"
+ groupName := "mygroup"
+ var id1 string
+ require.NoError(t, rdb.Del(ctx, streamName).Err())
+ {
+ rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "*",
+ Values: []string{"a", "1"},
+ })
+ require.NoError(t, rsp.Err())
+ id1 = rsp.Val()
+ }
+ var id2 string
+ {
+ rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "*",
+ Values: []string{"b", "2"},
+ })
+ require.NoError(t, rsp.Err())
+ id2 = rsp.Val()
+ }
+ var id3 string
+ {
+ rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "*",
+ Values: []string{"c", "3"},
+ })
+ require.NoError(t, rsp.Err())
+ id3 = rsp.Val()
+ }
+ var id4 string
+ {
+ rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "*",
+ Values: []string{"d", "4"},
+ })
+ require.NoError(t, rsp.Err())
+ id4 = rsp.Val()
+ }
+
+ require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+
+ consumer1 := "consumer1"
+ consumer2 := "consumer2"
+ {
+ rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumer1,
+ Streams: []string{streamName, ">"},
+ Count: 1,
+ })
+ require.NoError(t, rsp.Err())
+ require.Len(t, rsp.Val(), 1)
+ require.Len(t, rsp.Val()[0].Messages, 1)
+ require.Equal(t, id1, rsp.Val()[0].Messages[0].ID)
+ require.Len(t, rsp.Val()[0].Messages[0].Values, 1)
+ require.Equal(t, "1",
rsp.Val()[0].Messages[0].Values["a"])
+ }
+
+ {
+ time.Sleep(200 * time.Millisecond)
+ rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{
+ Stream: streamName,
+ Group: groupName,
+ Consumer: consumer2,
+ MinIdle: 10 * time.Millisecond,
+ Count: 1,
+ Start: "-",
+ })
+ require.NoError(t, rsp.Err())
+ msgs, start := rsp.Val()
+ require.Equal(t, "0-0", start)
+ require.Len(t, msgs, 1)
+ require.Len(t, msgs[0].Values, 1)
+ require.Equal(t, "1", msgs[0].Values["a"])
+ }
+
+ {
+ rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumer1,
+ Streams: []string{streamName, ">"},
+ Count: 3,
+ })
+ require.NoError(t, rsp.Err())
+
+ time.Sleep(time.Millisecond * 200)
+ require.NoError(t, rdb.XDel(ctx, streamName, id2).Err())
+ }
+
+ {
+ cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName,
consumer2, 10, "-", "COUNT", 3)
+ require.NoError(t, cmd.Err())
+ require.Equal(t, []interface{}{
+ id4,
+ []interface{}{
+ []interface{}{
+ id1,
+ []interface{}{"a", "1"},
+ },
+ []interface{}{
+ id3,
+ []interface{}{"c", "3"},
+ },
+ },
+ []interface{}{
+ id2,
+ },
+ }, cmd.Val())
+ }
+
+ {
+ time.Sleep(time.Millisecond * 200)
+ require.NoError(t, rdb.XDel(ctx, streamName, id4).Err())
+ rsp := rdb.XAutoClaimJustID(ctx, &redis.XAutoClaimArgs{
+ Stream: streamName,
+ Group: groupName,
+ Consumer: consumer2,
+ MinIdle: 10 * time.Millisecond,
+ Start: "-",
+ })
+ require.NoError(t, rsp.Err())
+ msgs, start := rsp.Val()
+ require.Equal(t, "0-0", start)
+ require.Len(t, msgs, 2)
+ require.Equal(t, id1, msgs[0])
+ require.Equal(t, id3, msgs[1])
+ }
+ })
+
+ t.Run("XAUTOCLAIM as an iterator", func(t *testing.T) {
+ streamName := "mystream"
+ groupName := "mygroup"
+ var id3, id5 string
+ require.NoError(t, rdb.Del(ctx, streamName).Err())
+ {
+ rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "*",
+ Values: []string{"a", "1"},
+ })
+ require.NoError(t, rsp.Err())
+ }
+ {
+ rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "*",
+ Values: []string{"b", "2"},
+ })
+ require.NoError(t, rsp.Err())
+ }
+ {
+ rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "*",
+ Values: []string{"c", "3"},
+ })
+ require.NoError(t, rsp.Err())
+ id3 = rsp.Val()
+ }
+ {
+ rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "*",
+ Values: []string{"d", "4"},
+ })
+ require.NoError(t, rsp.Err())
+ }
+ {
+ rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "*",
+ Values: []string{"e", "5"},
+ })
+ require.NoError(t, rsp.Err())
+ id5 = rsp.Val()
+ }
+ require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+
+ consumer1, consumer2 := "consumer1", "consumer2"
+ {
+ rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumer1,
+ Streams: []string{streamName, ">"},
+ Count: 90,
+ })
+ require.NoError(t, rsp.Err())
+ time.Sleep(200 * time.Millisecond)
+ }
+ {
+ rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{
+ Stream: streamName,
+ Group: groupName,
+ Consumer: consumer2,
+ MinIdle: 10 * time.Millisecond,
+ Count: 2,
+ Start: "-",
+ })
+ require.NoError(t, rsp.Err())
+ msgs, start := rsp.Val()
+ require.Equal(t, id3, start)
+ require.Len(t, msgs, 2)
+ require.Len(t, msgs[0].Values, 1)
+ require.Equal(t, "1", msgs[0].Values["a"])
+ }
+
+ {
+ rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{
+ Stream: streamName,
+ Group: groupName,
+ Consumer: consumer2,
+ MinIdle: 10 * time.Millisecond,
+ Start: id3,
+ Count: 2,
+ })
+ require.NoError(t, rsp.Err())
+ msgs, start := rsp.Val()
+ require.Equal(t, id5, start)
+ require.Len(t, msgs, 2)
+ require.Len(t, msgs[0].Values, 1)
+ require.Equal(t, "3", msgs[0].Values["c"])
+ }
+
+ {
+ rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{
+ Stream: streamName,
+ Group: groupName,
+ Consumer: consumer2,
+ MinIdle: 10 * time.Millisecond,
+ Start: id5,
+ Count: 1,
+ })
+ require.NoError(t, rsp.Err())
+ msgs, start := rsp.Val()
+ require.Equal(t, "0-0", start)
+ require.Len(t, msgs, 1)
+ require.Len(t, msgs[0].Values, 1)
+ require.Equal(t, "5", msgs[0].Values["e"])
+ }
+ })
+
+ t.Run("XAUTOCLAIM with XDEL", func(t *testing.T) {
+ streamName := "x"
+ groupName := "grp"
+ require.NoError(t, rdb.Del(ctx, streamName).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "1-0",
+ Values: map[string]interface{}{"f": "v"},
+ }).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "2-0",
+ Values: map[string]interface{}{"f": "v"},
+ }).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "3-0",
+ Values: map[string]interface{}{"f": "v"},
+ }).Err())
+ require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+ {
+ rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: "Alice",
+ Streams: []string{streamName, ">"},
+ })
+ require.NoError(t, rsp.Err())
+ require.Len(t, rsp.Val(), 1)
+ require.Len(t, rsp.Val()[0].Messages, 3)
+ require.Equal(t, "1-0", rsp.Val()[0].Messages[0].ID)
+ require.Equal(t, "v",
rsp.Val()[0].Messages[0].Values["f"])
+ require.Equal(t, "2-0", rsp.Val()[0].Messages[1].ID)
+ require.Equal(t, "v",
rsp.Val()[0].Messages[1].Values["f"])
+ require.Equal(t, "3-0", rsp.Val()[0].Messages[2].ID)
+ require.Equal(t, "v",
rsp.Val()[0].Messages[2].Values["f"])
+ }
+ {
+ require.NoError(t, rdb.XDel(ctx, streamName,
"2-0").Err())
+ cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName,
"Bob", 0, "0-0")
+ require.NoError(t, cmd.Err())
+ require.Equal(t, []interface{}{
+ "0-0",
+ []interface{}{
+ []interface{}{
+ "1-0",
+ []interface{}{"f", "v"},
+ },
+ []interface{}{
+ "3-0",
+ []interface{}{"f", "v"},
+ },
+ },
+ []interface{}{
+ "2-0",
+ },
+ }, cmd.Val())
+ }
+ })
+
+ t.Run("XAUTOCLAIM with XDEL and count", func(t *testing.T) {
+ streamName := "x"
+ groupName := "grp"
+ require.NoError(t, rdb.Del(ctx, streamName).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "1-0",
+ Values: map[string]interface{}{"f": "v"},
+ }).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "2-0",
+ Values: map[string]interface{}{"f": "v"},
+ }).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "3-0",
+ Values: map[string]interface{}{"f": "v"},
+ }).Err())
+ require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+ {
+ rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: "Alice",
+ Streams: []string{streamName, ">"},
+ })
+ require.NoError(t, rsp.Err())
+ require.Len(t, rsp.Val(), 1)
+ require.Len(t, rsp.Val()[0].Messages, 3)
+ require.Equal(t, "1-0", rsp.Val()[0].Messages[0].ID)
+ require.Equal(t, "v",
rsp.Val()[0].Messages[0].Values["f"])
+ require.Equal(t, "2-0", rsp.Val()[0].Messages[1].ID)
+ require.Equal(t, "v",
rsp.Val()[0].Messages[1].Values["f"])
+ require.Equal(t, "3-0", rsp.Val()[0].Messages[2].ID)
+ require.Equal(t, "v",
rsp.Val()[0].Messages[2].Values["f"])
+ }
+ {
+ require.NoError(t, rdb.XDel(ctx, streamName,
"1-0").Err())
+ require.NoError(t, rdb.XDel(ctx, streamName,
"2-0").Err())
+ cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName,
"Bob", 0, "0-0", "COUNT", 1)
+ require.NoError(t, cmd.Err())
+ require.Equal(t, []interface{}{
+ "2-0",
+ []interface{}{},
+ []interface{}{
+ "1-0",
+ },
+ }, cmd.Val())
+ }
+ {
+ cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName,
"Bob", 0, "2-0", "COUNT", 1)
+ require.NoError(t, cmd.Err())
+ require.Equal(t, []interface{}{
+ "3-0",
+ []interface{}{},
+ []interface{}{
+ "2-0",
+ },
+ }, cmd.Val())
+ }
+ {
+ cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName,
"Bob", 0, "3-0", "COUNT", 1)
+ require.NoError(t, cmd.Err())
+ require.Equal(t, []interface{}{
+ "0-0",
+ []interface{}{
+ []interface{}{
+ "3-0",
+ []interface{}{"f", "v"},
+ },
+ },
+ []interface{}{},
+ }, cmd.Val())
+ }
+ // assert_equal [XPENDING x grp - + 10 Alice] {}
+ // add xpending to this test case when it is supported
+ })
+
+ t.Run("XAUTOCLAIM with out of range count", func(t *testing.T) {
+ err := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{
+ Stream: "x",
+ Group: "grp",
+ Consumer: "Bob",
+ MinIdle: 0,
+ Start: "3-0",
+ Count: 8070450532247928833,
+ }).Err()
+ require.Error(t, err)
+ require.True(t, strings.HasPrefix(err.Error(), "ERR COUNT"))
+ })
+
+ t.Run("XAUTOCLAIM COUNT must be > 0", func(t *testing.T) {
+ cmd := rdb.Do(ctx, "XAUTOCLAIM", "key", "group", "consumer", 1,
1, "COUNT", 0)
+ require.Error(t, cmd.Err())
+ require.Equal(t, "ERR COUNT must be > 0", cmd.Err().Error())
+ })
}
func parseStreamEntryID(id string) (ts int64, seqNum int64) {