This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new f03d48ca feat(stream): add support of the `XAUTOCLAIM` command (#2373)
f03d48ca is described below

commit f03d48caab79aba468957ccc782e47989960e609
Author: Edward Xu <[email protected]>
AuthorDate: Thu Jun 27 18:41:42 2024 +0800

    feat(stream): add support of the `XAUTOCLAIM` command (#2373)
---
 src/commands/cmd_stream.cc                   |  94 +++++++
 src/types/redis_stream.cc                    | 160 +++++++++++
 src/types/redis_stream.h                     |   2 +
 src/types/redis_stream_base.h                |  15 +
 tests/gocase/unit/type/stream/stream_test.go | 404 +++++++++++++++++++++++++++
 5 files changed, 675 insertions(+)

diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index ba4f7d98..7faeb9c4 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -18,6 +18,8 @@
  *
  */
 
+#include <algorithm>
+#include <limits>
 #include <memory>
 #include <stdexcept>
 
@@ -358,6 +360,97 @@ class CommandXClaim : public Commander {
   }
 };
 
+class CommandAutoClaim : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    CommandParser parser(args, 1);
+    key_name_ = GET_OR_RET(parser.TakeStr());
+    group_name_ = GET_OR_RET(parser.TakeStr());
+    consumer_name_ = GET_OR_RET(parser.TakeStr());
+    if (auto parse_status = parser.TakeInt<uint64_t>(); !parse_status.IsOK()) {
+      return {Status::RedisParseErr, "Invalid min-idle-time argument for 
XAUTOCLAIM"};
+    } else {
+      options_.min_idle_time_ms = parse_status.GetValue();
+    }
+
+    auto start_str = GET_OR_RET(parser.TakeStr());
+    if (!start_str.empty() && start_str.front() == '(') {
+      options_.exclude_start = true;
+      start_str = start_str.substr(1);
+    }
+    if (!options_.exclude_start && start_str == "-") {
+      options_.start_id = StreamEntryID::Minimum();
+    } else {
+      auto parse_status = ParseRangeStart(start_str, &options_.start_id);
+      if (!parse_status.IsOK()) {
+        return parse_status;
+      }
+    }
+
+    if (parser.EatEqICase("count")) {
+      uint64_t count = GET_OR_RET(parser.TakeInt<uint64_t>());
+      constexpr uint64_t min_count = 1;
+      uint64_t max_count = std::numeric_limits<int64_t>::max() /
+                           
(std::max(static_cast<uint64_t>(sizeof(StreamEntryID)), 
options_.attempts_factors));
+      if (count < min_count || count > max_count) {
+        return {Status::RedisParseErr, "COUNT must be > 0"};
+      }
+      options_.count = count;
+    }
+
+    if (parser.Good() && parser.EatEqICase("justid")) {
+      options_.just_id = true;
+    }
+
+    return Status::OK();
+  }
+
+  Status Execute(Server *srv, Connection *conn, std::string *output) override {
+    redis::Stream stream_db(srv->storage, conn->GetNamespace());
+    StreamAutoClaimResult result;
+    auto s = stream_db.AutoClaim(key_name_, group_name_, consumer_name_, 
options_, &result);
+    if (!s.ok()) {
+      if (s.IsNotFound()) {
+        return {Status::RedisExecErr,
+                "NOGROUP No such key '" + key_name_ + "' or consumer group '" 
+ group_name_ + "'"};
+      }
+      return {Status::RedisExecErr, s.ToString()};
+    }
+    return sendResults(conn, result, output);
+  }
+
+ private:
+  Status sendResults(Connection *conn, const StreamAutoClaimResult &result, 
std::string *output) const {
+    output->append(redis::MultiLen(3));
+    output->append(redis::BulkString(result.next_claim_id));
+    output->append(redis::MultiLen(result.entries.size()));
+    for (const auto &item : result.entries) {
+      if (options_.just_id) {
+        output->append(redis::BulkString(item.key));
+      } else {
+        output->append(redis::MultiLen(2));
+        output->append(redis::BulkString(item.key));
+        output->append(redis::MultiLen(item.values.size()));
+        for (const auto &value_item : item.values) {
+          output->append(redis::BulkString(value_item));
+        }
+      }
+    }
+
+    output->append(redis::MultiLen(result.deleted_ids.size()));
+    for (const auto &item : result.deleted_ids) {
+      output->append(redis::BulkString(item));
+    }
+
+    return Status::OK();
+  }
+
+  std::string key_name_;
+  std::string group_name_;
+  std::string consumer_name_;
+  StreamAutoClaimOptions options_;
+};
+
 class CommandXGroup : public Commander {
  public:
   Status Parse(const std::vector<std::string> &args) override {
@@ -1647,6 +1740,7 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandXAck>("xack", 
-4, "write no-dbsize-ch
                         MakeCmdAttr<CommandXAdd>("xadd", -5, "write", 1, 1, 1),
                         MakeCmdAttr<CommandXDel>("xdel", -3, "write 
no-dbsize-check", 1, 1, 1),
                         MakeCmdAttr<CommandXClaim>("xclaim", -6, "write", 1, 
1, 1),
+                        MakeCmdAttr<CommandAutoClaim>("xautoclaim", -6, 
"write", 1, 1, 1),
                         MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2, 
2, 1),
                         MakeCmdAttr<CommandXLen>("xlen", -2, "read-only", 1, 
1, 1),
                         MakeCmdAttr<CommandXInfo>("xinfo", -2, "read-only", 0, 
0, 0),
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 8b624a2e..6bf03d34 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -511,6 +511,166 @@ rocksdb::Status Stream::ClaimPelEntries(const Slice 
&stream_name, const std::str
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+rocksdb::Status Stream::AutoClaim(const Slice &stream_name, const std::string 
&group_name,
+                                  const std::string &consumer_name, const 
StreamAutoClaimOptions &options,
+                                  StreamAutoClaimResult *result) {
+  if (options.exclude_start && options.start_id.IsMaximum()) {
+    return rocksdb::Status::InvalidArgument("invalid start ID for the 
interval");
+  }
+
+  std::string ns_key = AppendNamespacePrefix(stream_name);
+  StreamMetadata metadata(false);
+
+  LockGuard guard(storage_->GetLockManager(), ns_key);
+  auto s = GetMetadata(GetOptions{}, ns_key, &metadata);
+  if (!s.ok()) {  // not found will be caught by outside with no such key or 
consumer group
+    return s;
+  }
+
+  std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, 
group_name, consumer_name);
+  std::string get_consumer_value;
+  s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, 
&get_consumer_value);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+  if (s.IsNotFound()) {
+    int created_number = 0;
+    s = createConsumerWithoutLock(stream_name, group_name, consumer_name, 
&created_number);
+    if (!s.ok()) {
+      return s;
+    }
+    s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, 
&get_consumer_value);
+    if (!s.ok()) {
+      return s;
+    }
+  }
+
+  StreamConsumerMetadata current_consumer_metadata = 
decodeStreamConsumerMetadataValue(get_consumer_value);
+  std::map<std::string, uint64_t> claimed_consumer_entity_count;
+  std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, 
group_name, options.start_id);
+  std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, 
group_name, StreamEntryID::Maximum());
+
+  LatestSnapShot ss{storage_};
+  rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
+  read_options.snapshot = ss.GetSnapShot();
+  rocksdb::Slice lower_bound(prefix_key);
+  rocksdb::Slice upper_bound(end_key);
+  read_options.iterate_lower_bound = &lower_bound;
+  read_options.iterate_upper_bound = &upper_bound;
+
+  auto count = options.count;
+  uint64_t attempts = options.attempts_factors * count;
+  auto now_ms = util::GetTimeStampMS();
+  std::vector<StreamEntryID> deleted_entries;
+  std::vector<StreamEntry> pending_entries;
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisStream);
+  batch->PutLogData(log_data.Encode());
+
+  auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
+  uint64_t total_claimed_count = 0;
+  for (iter->SeekToFirst(); iter->Valid() && count > 0 && attempts > 0; 
iter->Next()) {
+    if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) {
+      std::string tmp_group_name;
+      StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), 
tmp_group_name);
+      if (tmp_group_name != group_name) {
+        continue;
+      }
+
+      if (options.exclude_start && entry_id == options.start_id) {
+        continue;
+      }
+
+      attempts--;
+
+      StreamPelEntry penl_entry = 
decodeStreamPelEntryValue(iter->value().ToString());
+      if ((now_ms - penl_entry.last_delivery_time_ms) < 
options.min_idle_time_ms) {
+        continue;
+      }
+
+      auto entry_key = internalKeyFromEntryID(ns_key, metadata, entry_id);
+      std::string entry_value;
+      s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, 
&entry_value);
+      if (!s.ok()) {
+        if (s.IsNotFound()) {
+          deleted_entries.push_back(entry_id);
+          batch->Delete(stream_cf_handle_, iter->key());
+          --count;
+          continue;
+        }
+        return s;
+      }
+
+      StreamEntry entry(entry_id.ToString(), {});
+      if (!options.just_id) {
+        auto rv_status = DecodeRawStreamEntryValue(entry_value, &entry.values);
+        if (!rv_status.OK()) {
+          return rocksdb::Status::InvalidArgument(rv_status.Msg());
+        }
+      }
+
+      pending_entries.emplace_back(std::move(entry));
+      --count;
+
+      if (penl_entry.consumer_name != consumer_name) {
+        ++total_claimed_count;
+        claimed_consumer_entity_count[penl_entry.consumer_name] += 1;
+        penl_entry.consumer_name = consumer_name;
+        penl_entry.last_delivery_time_ms = now_ms;
+        // Increment the delivery attempts counter unless JUSTID option 
provided
+        if (!options.just_id) {
+          penl_entry.last_delivery_count += 1;
+        }
+        batch->Put(stream_cf_handle_, iter->key(), 
encodeStreamPelEntryValue(penl_entry));
+      }
+    }
+  }
+
+  if (total_claimed_count > 0 && !pending_entries.empty()) {
+    current_consumer_metadata.pending_number += total_claimed_count;
+    current_consumer_metadata.last_attempted_interaction_ms = now_ms;
+
+    batch->Put(stream_cf_handle_, consumer_key, 
encodeStreamConsumerMetadataValue(current_consumer_metadata));
+
+    for (const auto &[consumer, count] : claimed_consumer_entity_count) {
+      std::string tmp_consumer_key = internalKeyFromConsumerName(ns_key, 
metadata, group_name, consumer);
+      std::string tmp_consumer_value;
+      s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, 
tmp_consumer_key, &tmp_consumer_value);
+      if (!s.ok()) {
+        return s;
+      }
+      StreamConsumerMetadata tmp_consumer_metadata = 
decodeStreamConsumerMetadataValue(tmp_consumer_value);
+      tmp_consumer_metadata.pending_number -= count;
+      batch->Put(stream_cf_handle_, tmp_consumer_key, 
encodeStreamConsumerMetadataValue(tmp_consumer_metadata));
+    }
+  }
+
+  bool has_next_entry = false;
+  for (; iter->Valid(); iter->Next()) {
+    if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) {
+      has_next_entry = true;
+      break;
+    }
+  }
+
+  if (has_next_entry) {
+    std::string tmp_group_name;
+    StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), 
tmp_group_name);
+    result->next_claim_id = entry_id.ToString();
+  } else {
+    result->next_claim_id = StreamEntryID::Minimum().ToString();
+  }
+
+  result->entries = std::move(pending_entries);
+  result->deleted_ids.clear();
+  result->deleted_ids.reserve(deleted_entries.size());
+  std::transform(deleted_entries.cbegin(), deleted_entries.cend(), 
std::back_inserter(result->deleted_ids),
+                 [](const StreamEntryID &id) { return id.ToString(); });
+
+  return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+}
+
 rocksdb::Status Stream::CreateGroup(const Slice &stream_name, const 
StreamXGroupCreateOptions &options,
                                     const std::string &group_name) {
   if (std::isdigit(group_name[0])) {
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index e0546ded..510cbb66 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -55,6 +55,8 @@ class Stream : public SubKeyScanner {
                                   const std::string &consumer_name, uint64_t 
min_idle_time_ms,
                                   const std::vector<StreamEntryID> &entry_ids, 
const StreamClaimOptions &options,
                                   StreamClaimResult *result);
+  rocksdb::Status AutoClaim(const Slice &stream_name, const std::string 
&group_name, const std::string &consumer_name,
+                            const StreamAutoClaimOptions &options, 
StreamAutoClaimResult *result);
   rocksdb::Status Len(const Slice &stream_name, const StreamLenOptions 
&options, uint64_t *size);
   rocksdb::Status GetStreamInfo(const Slice &stream_name, bool full, uint64_t 
count, StreamInfo *info);
   rocksdb::Status GetGroupInfo(const Slice &stream_name,
diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h
index 889e4046..82c8f945 100644
--- a/src/types/redis_stream_base.h
+++ b/src/types/redis_stream_base.h
@@ -173,6 +173,15 @@ struct StreamClaimOptions {
   StreamEntryID last_delivered_id;
 };
 
+struct StreamAutoClaimOptions {
+  uint64_t min_idle_time_ms;
+  uint64_t count = 100;
+  uint64_t attempts_factors = 10;
+  StreamEntryID start_id;
+  bool just_id = false;
+  bool exclude_start = false;
+};
+
 struct StreamConsumerGroupMetadata {
   uint64_t consumer_number = 0;
   uint64_t pending_number = 0;
@@ -224,6 +233,12 @@ struct StreamClaimResult {
   std::vector<StreamEntry> entries;
 };
 
+struct StreamAutoClaimResult {
+  std::string next_claim_id;
+  std::vector<StreamEntry> entries;
+  std::vector<std::string> deleted_ids;
+};
+
 Status IncrementStreamEntryID(StreamEntryID *id);
 Status ParseStreamEntryID(const std::string &input, StreamEntryID *id);
 StatusOr<std::unique_ptr<NextStreamEntryIDGenerationStrategy>> 
ParseNextStreamEntryIDStrategy(const std::string &input);
diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index 7297bb66..b8958e70 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -1500,6 +1500,410 @@ func TestStreamOffset(t *testing.T) {
                require.Len(t, claimedIDs, 1, "Expected to claim exactly one 
message ID")
                require.Equal(t, "1-0", claimedIDs[0], "Expected claimed 
message ID to match")
        })
+
+       t.Run("XAUTOCLAIM can claim PEL items from another consume", func(t 
*testing.T) {
+
+               streamName := "mystream"
+               groupName := "mygroup"
+               var id1 string
+               require.NoError(t, rdb.Del(ctx, streamName).Err())
+               {
+                       rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+                               Stream: streamName,
+                               ID:     "*",
+                               Values: []string{"a", "1"},
+                       })
+                       require.NoError(t, rsp.Err())
+                       id1 = rsp.Val()
+               }
+               var id2 string
+               {
+                       rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+                               Stream: streamName,
+                               ID:     "*",
+                               Values: []string{"b", "2"},
+                       })
+                       require.NoError(t, rsp.Err())
+                       id2 = rsp.Val()
+               }
+               var id3 string
+               {
+                       rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+                               Stream: streamName,
+                               ID:     "*",
+                               Values: []string{"c", "3"},
+                       })
+                       require.NoError(t, rsp.Err())
+                       id3 = rsp.Val()
+               }
+               var id4 string
+               {
+                       rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+                               Stream: streamName,
+                               ID:     "*",
+                               Values: []string{"d", "4"},
+                       })
+                       require.NoError(t, rsp.Err())
+                       id4 = rsp.Val()
+               }
+
+               require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, 
"0").Err())
+
+               consumer1 := "consumer1"
+               consumer2 := "consumer2"
+               {
+                       rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                               Group:    groupName,
+                               Consumer: consumer1,
+                               Streams:  []string{streamName, ">"},
+                               Count:    1,
+                       })
+                       require.NoError(t, rsp.Err())
+                       require.Len(t, rsp.Val(), 1)
+                       require.Len(t, rsp.Val()[0].Messages, 1)
+                       require.Equal(t, id1, rsp.Val()[0].Messages[0].ID)
+                       require.Len(t, rsp.Val()[0].Messages[0].Values, 1)
+                       require.Equal(t, "1", 
rsp.Val()[0].Messages[0].Values["a"])
+               }
+
+               {
+                       time.Sleep(200 * time.Millisecond)
+                       rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{
+                               Stream:   streamName,
+                               Group:    groupName,
+                               Consumer: consumer2,
+                               MinIdle:  10 * time.Millisecond,
+                               Count:    1,
+                               Start:    "-",
+                       })
+                       require.NoError(t, rsp.Err())
+                       msgs, start := rsp.Val()
+                       require.Equal(t, "0-0", start)
+                       require.Len(t, msgs, 1)
+                       require.Len(t, msgs[0].Values, 1)
+                       require.Equal(t, "1", msgs[0].Values["a"])
+               }
+
+               {
+                       rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                               Group:    groupName,
+                               Consumer: consumer1,
+                               Streams:  []string{streamName, ">"},
+                               Count:    3,
+                       })
+                       require.NoError(t, rsp.Err())
+
+                       time.Sleep(time.Millisecond * 200)
+                       require.NoError(t, rdb.XDel(ctx, streamName, id2).Err())
+               }
+
+               {
+                       cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName, 
consumer2, 10, "-", "COUNT", 3)
+                       require.NoError(t, cmd.Err())
+                       require.Equal(t, []interface{}{
+                               id4,
+                               []interface{}{
+                                       []interface{}{
+                                               id1,
+                                               []interface{}{"a", "1"},
+                                       },
+                                       []interface{}{
+                                               id3,
+                                               []interface{}{"c", "3"},
+                                       },
+                               },
+                               []interface{}{
+                                       id2,
+                               },
+                       }, cmd.Val())
+               }
+
+               {
+                       time.Sleep(time.Millisecond * 200)
+                       require.NoError(t, rdb.XDel(ctx, streamName, id4).Err())
+                       rsp := rdb.XAutoClaimJustID(ctx, &redis.XAutoClaimArgs{
+                               Stream:   streamName,
+                               Group:    groupName,
+                               Consumer: consumer2,
+                               MinIdle:  10 * time.Millisecond,
+                               Start:    "-",
+                       })
+                       require.NoError(t, rsp.Err())
+                       msgs, start := rsp.Val()
+                       require.Equal(t, "0-0", start)
+                       require.Len(t, msgs, 2)
+                       require.Equal(t, id1, msgs[0])
+                       require.Equal(t, id3, msgs[1])
+               }
+       })
+
+       t.Run("XAUTOCLAIM as an iterator", func(t *testing.T) {
+               streamName := "mystream"
+               groupName := "mygroup"
+               var id3, id5 string
+               require.NoError(t, rdb.Del(ctx, streamName).Err())
+               {
+                       rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+                               Stream: streamName,
+                               ID:     "*",
+                               Values: []string{"a", "1"},
+                       })
+                       require.NoError(t, rsp.Err())
+               }
+               {
+                       rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+                               Stream: streamName,
+                               ID:     "*",
+                               Values: []string{"b", "2"},
+                       })
+                       require.NoError(t, rsp.Err())
+               }
+               {
+                       rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+                               Stream: streamName,
+                               ID:     "*",
+                               Values: []string{"c", "3"},
+                       })
+                       require.NoError(t, rsp.Err())
+                       id3 = rsp.Val()
+               }
+               {
+                       rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+                               Stream: streamName,
+                               ID:     "*",
+                               Values: []string{"d", "4"},
+                       })
+                       require.NoError(t, rsp.Err())
+               }
+               {
+                       rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+                               Stream: streamName,
+                               ID:     "*",
+                               Values: []string{"e", "5"},
+                       })
+                       require.NoError(t, rsp.Err())
+                       id5 = rsp.Val()
+               }
+               require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, 
"0").Err())
+
+               consumer1, consumer2 := "consumer1", "consumer2"
+               {
+                       rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                               Group:    groupName,
+                               Consumer: consumer1,
+                               Streams:  []string{streamName, ">"},
+                               Count:    90,
+                       })
+                       require.NoError(t, rsp.Err())
+                       time.Sleep(200 * time.Millisecond)
+               }
+               {
+                       rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{
+                               Stream:   streamName,
+                               Group:    groupName,
+                               Consumer: consumer2,
+                               MinIdle:  10 * time.Millisecond,
+                               Count:    2,
+                               Start:    "-",
+                       })
+                       require.NoError(t, rsp.Err())
+                       msgs, start := rsp.Val()
+                       require.Equal(t, id3, start)
+                       require.Len(t, msgs, 2)
+                       require.Len(t, msgs[0].Values, 1)
+                       require.Equal(t, "1", msgs[0].Values["a"])
+               }
+
+               {
+                       rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{
+                               Stream:   streamName,
+                               Group:    groupName,
+                               Consumer: consumer2,
+                               MinIdle:  10 * time.Millisecond,
+                               Start:    id3,
+                               Count:    2,
+                       })
+                       require.NoError(t, rsp.Err())
+                       msgs, start := rsp.Val()
+                       require.Equal(t, id5, start)
+                       require.Len(t, msgs, 2)
+                       require.Len(t, msgs[0].Values, 1)
+                       require.Equal(t, "3", msgs[0].Values["c"])
+               }
+
+               {
+                       rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{
+                               Stream:   streamName,
+                               Group:    groupName,
+                               Consumer: consumer2,
+                               MinIdle:  10 * time.Millisecond,
+                               Start:    id5,
+                               Count:    1,
+                       })
+                       require.NoError(t, rsp.Err())
+                       msgs, start := rsp.Val()
+                       require.Equal(t, "0-0", start)
+                       require.Len(t, msgs, 1)
+                       require.Len(t, msgs[0].Values, 1)
+                       require.Equal(t, "5", msgs[0].Values["e"])
+               }
+       })
+
+       t.Run("XAUTOCLAIM with XDEL", func(t *testing.T) {
+               streamName := "x"
+               groupName := "grp"
+               require.NoError(t, rdb.Del(ctx, streamName).Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "1-0",
+                       Values: map[string]interface{}{"f": "v"},
+               }).Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "2-0",
+                       Values: map[string]interface{}{"f": "v"},
+               }).Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "3-0",
+                       Values: map[string]interface{}{"f": "v"},
+               }).Err())
+               require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, 
"0").Err())
+               {
+                       rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                               Group:    groupName,
+                               Consumer: "Alice",
+                               Streams:  []string{streamName, ">"},
+                       })
+                       require.NoError(t, rsp.Err())
+                       require.Len(t, rsp.Val(), 1)
+                       require.Len(t, rsp.Val()[0].Messages, 3)
+                       require.Equal(t, "1-0", rsp.Val()[0].Messages[0].ID)
+                       require.Equal(t, "v", 
rsp.Val()[0].Messages[0].Values["f"])
+                       require.Equal(t, "2-0", rsp.Val()[0].Messages[1].ID)
+                       require.Equal(t, "v", 
rsp.Val()[0].Messages[1].Values["f"])
+                       require.Equal(t, "3-0", rsp.Val()[0].Messages[2].ID)
+                       require.Equal(t, "v", 
rsp.Val()[0].Messages[2].Values["f"])
+               }
+               {
+                       require.NoError(t, rdb.XDel(ctx, streamName, 
"2-0").Err())
+                       cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName, 
"Bob", 0, "0-0")
+                       require.NoError(t, cmd.Err())
+                       require.Equal(t, []interface{}{
+                               "0-0",
+                               []interface{}{
+                                       []interface{}{
+                                               "1-0",
+                                               []interface{}{"f", "v"},
+                                       },
+                                       []interface{}{
+                                               "3-0",
+                                               []interface{}{"f", "v"},
+                                       },
+                               },
+                               []interface{}{
+                                       "2-0",
+                               },
+                       }, cmd.Val())
+               }
+       })
+
+       t.Run("XAUTOCLAIM with XDEL and count", func(t *testing.T) {
+               streamName := "x"
+               groupName := "grp"
+               require.NoError(t, rdb.Del(ctx, streamName).Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "1-0",
+                       Values: map[string]interface{}{"f": "v"},
+               }).Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "2-0",
+                       Values: map[string]interface{}{"f": "v"},
+               }).Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "3-0",
+                       Values: map[string]interface{}{"f": "v"},
+               }).Err())
+               require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, 
"0").Err())
+               {
+                       rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                               Group:    groupName,
+                               Consumer: "Alice",
+                               Streams:  []string{streamName, ">"},
+                       })
+                       require.NoError(t, rsp.Err())
+                       require.Len(t, rsp.Val(), 1)
+                       require.Len(t, rsp.Val()[0].Messages, 3)
+                       require.Equal(t, "1-0", rsp.Val()[0].Messages[0].ID)
+                       require.Equal(t, "v", 
rsp.Val()[0].Messages[0].Values["f"])
+                       require.Equal(t, "2-0", rsp.Val()[0].Messages[1].ID)
+                       require.Equal(t, "v", 
rsp.Val()[0].Messages[1].Values["f"])
+                       require.Equal(t, "3-0", rsp.Val()[0].Messages[2].ID)
+                       require.Equal(t, "v", 
rsp.Val()[0].Messages[2].Values["f"])
+               }
+               {
+                       require.NoError(t, rdb.XDel(ctx, streamName, 
"1-0").Err())
+                       require.NoError(t, rdb.XDel(ctx, streamName, 
"2-0").Err())
+                       cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName, 
"Bob", 0, "0-0", "COUNT", 1)
+                       require.NoError(t, cmd.Err())
+                       require.Equal(t, []interface{}{
+                               "2-0",
+                               []interface{}{},
+                               []interface{}{
+                                       "1-0",
+                               },
+                       }, cmd.Val())
+               }
+               {
+                       cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName, 
"Bob", 0, "2-0", "COUNT", 1)
+                       require.NoError(t, cmd.Err())
+                       require.Equal(t, []interface{}{
+                               "3-0",
+                               []interface{}{},
+                               []interface{}{
+                                       "2-0",
+                               },
+                       }, cmd.Val())
+               }
+               {
+                       cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName, 
"Bob", 0, "3-0", "COUNT", 1)
+                       require.NoError(t, cmd.Err())
+                       require.Equal(t, []interface{}{
+                               "0-0",
+                               []interface{}{
+                                       []interface{}{
+                                               "3-0",
+                                               []interface{}{"f", "v"},
+                                       },
+                               },
+                               []interface{}{},
+                       }, cmd.Val())
+               }
+               // assert_equal [XPENDING x grp - + 10 Alice] {}
+               // add xpending to this test case when it is supported
+       })
+
+       t.Run("XAUTOCLAIM with out of range count", func(t *testing.T) {
+               err := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{
+                       Stream:   "x",
+                       Group:    "grp",
+                       Consumer: "Bob",
+                       MinIdle:  0,
+                       Start:    "3-0",
+                       Count:    8070450532247928833,
+               }).Err()
+               require.Error(t, err)
+               require.True(t, strings.HasPrefix(err.Error(), "ERR COUNT"))
+       })
+
+       t.Run("XAUTOCLAIM COUNT must be > 0", func(t *testing.T) {
+               cmd := rdb.Do(ctx, "XAUTOCLAIM", "key", "group", "consumer", 1, 
1, "COUNT", 0)
+               require.Error(t, cmd.Err())
+               require.Equal(t, "ERR COUNT must be > 0", cmd.Err().Error())
+       })
 }
 
 func parseStreamEntryID(id string) (ts int64, seqNum int64) {

Reply via email to