This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new a0dae44d Add support of the command XCLAIM (#2202)
a0dae44d is described below

commit a0dae44d2f2cbf37fdeecbffcb3e16b7b2c3ff12
Author: Rebecca Zhou <[email protected]>
AuthorDate: Mon May 13 05:53:28 2024 -0400

    Add support of the command XCLAIM (#2202)
    
    Co-authored-by: 纪华裕 <[email protected]>
---
 src/commands/cmd_stream.cc                   | 123 +++++++++++++++++++
 src/types/redis_stream.cc                    | 132 +++++++++++++++++++++
 src/types/redis_stream.h                     |   4 +
 src/types/redis_stream_base.h                |  17 +++
 tests/gocase/unit/type/stream/stream_test.go | 171 +++++++++++++++++++++++++++
 5 files changed, 447 insertions(+)

diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index a5fdf795..8468ee30 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -242,6 +242,128 @@ class CommandXDel : public Commander {
   std::vector<redis::StreamEntryID> ids_;
 };
 
+class CommandXClaim : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    if (args.size() < 6) {
+      return {Status::RedisParseErr, errWrongNumOfArguments};
+    }
+
+    CommandParser parser(args, 1);
+    stream_name_ = GET_OR_RET(parser.TakeStr());
+    group_name_ = GET_OR_RET(parser.TakeStr());
+    consumer_name_ = GET_OR_RET(parser.TakeStr());
+    auto parse_result = parser.TakeInt<int64_t>();
+    if (!parse_result.IsOK()) {
+      return {Status::RedisParseErr, errValueNotInteger};
+    }
+    min_idle_time_ms_ = parse_result.GetValue();
+    if (min_idle_time_ms_ < 0) {
+      min_idle_time_ms_ = 0;
+    }
+
+    while (parser.Good() && !isOption(parser.RawPeek())) {
+      auto raw_id = GET_OR_RET(parser.TakeStr());
+      redis::StreamEntryID id;
+      auto s = ParseStreamEntryID(raw_id, &id);
+      if (!s.IsOK()) {
+        return s;
+      }
+      entry_ids_.emplace_back(id);
+    }
+
+    while (parser.Good()) {
+      if (parser.EatEqICase("idle")) {
+        auto parse_result = parser.TakeInt<int64_t>();
+        if (!parse_result.IsOK()) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
+        if (parse_result.GetValue() < 0) {
+          return {Status::RedisParseErr, "IDLE for XCLAIM must be 
non-negative"};
+        }
+        stream_claim_options_.idle_time_ms = parse_result.GetValue();
+      } else if (parser.EatEqICase("time")) {
+        auto parse_result = parser.TakeInt<int64_t>();
+        if (!parse_result.IsOK()) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
+        if (parse_result.GetValue() < 0) {
+          return {Status::RedisParseErr, "TIME for XCLAIM must be 
non-negative"};
+        }
+        stream_claim_options_.with_time = true;
+        stream_claim_options_.last_delivery_time_ms = parse_result.GetValue();
+      } else if (parser.EatEqICase("retrycount")) {
+        auto parse_result = parser.TakeInt<int64_t>();
+        if (!parse_result.IsOK()) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
+        if (parse_result.GetValue() < 0) {
+          return {Status::RedisParseErr, "RETRYCOUNT for XCLAIM must be 
non-negative"};
+        }
+        stream_claim_options_.with_retry_count = true;
+        stream_claim_options_.last_delivery_count = parse_result.GetValue();
+      } else if (parser.EatEqICase("force")) {
+        stream_claim_options_.force = true;
+      } else if (parser.EatEqICase("justid")) {
+        stream_claim_options_.just_id = true;
+      } else if (parser.EatEqICase("lastid")) {
+        auto last_id = GET_OR_RET(parser.TakeStr());
+        auto s = ParseStreamEntryID(last_id, 
&stream_claim_options_.last_delivered_id);
+        if (!s.IsOK()) {
+          return s;
+        }
+      } else {
+        return parser.InvalidSyntax();
+      }
+    }
+    return Status::OK();
+  }
+
+  Status Execute(Server *srv, Connection *conn, std::string *output) override {
+    redis::Stream stream_db(srv->storage, conn->GetNamespace());
+    StreamClaimResult result;
+    auto s = stream_db.ClaimPelEntries(stream_name_, group_name_, 
consumer_name_, min_idle_time_ms_, entry_ids_,
+                                       stream_claim_options_, &result);
+    if (!s.ok()) {
+      return {Status::RedisExecErr, s.ToString()};
+    }
+
+    if (s.IsNotFound()) {
+      return {Status::RedisExecErr, errNoSuchKey};
+    }
+
+    if (!stream_claim_options_.just_id) {
+      output->append(redis::MultiLen(result.entries.size()));
+
+      for (const auto &e : result.entries) {
+        output->append(redis::MultiLen(2));
+        output->append(redis::BulkString(e.key));
+        output->append(conn->MultiBulkString(e.values));
+      }
+    } else {
+      output->append(redis::MultiLen(result.ids.size()));
+      for (const auto &id : result.ids) {
+        output->append(redis::BulkString(id));
+      }
+    }
+
+    return Status::OK();
+  }
+
+ private:
+  std::string stream_name_;
+  std::string group_name_;
+  std::string consumer_name_;
+  uint64_t min_idle_time_ms_;
+  std::vector<StreamEntryID> entry_ids_;
+  StreamClaimOptions stream_claim_options_;
+
+  bool static isOption(const std::string &arg) {
+    static const std::unordered_set<std::string> options = {"idle", "time", 
"retrycount", "force", "justid", "lastid"};
+    return options.find(util::ToLower(arg)) != options.end();
+  }
+};
+
 class CommandXGroup : public Commander {
  public:
   Status Parse(const std::vector<std::string> &args) override {
@@ -1534,6 +1656,7 @@ class CommandXSetId : public Commander {
 REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandXAck>("xack", -4, "write 
no-dbsize-check", 1, 1, 1),
                         MakeCmdAttr<CommandXAdd>("xadd", -5, "write", 1, 1, 1),
                         MakeCmdAttr<CommandXDel>("xdel", -3, "write 
no-dbsize-check", 1, 1, 1),
+                        MakeCmdAttr<CommandXClaim>("xclaim", -6, "write", 1, 
1, 1),
                         MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2, 
2, 1),
                         MakeCmdAttr<CommandXLen>("xlen", -2, "read-only", 1, 
1, 1),
                         MakeCmdAttr<CommandXInfo>("xinfo", -2, "read-only", 0, 
0, 0),
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 6c66f800..a3bdc63b 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -357,6 +357,138 @@ rocksdb::Status Stream::DeletePelEntries(const Slice 
&stream_name, const std::st
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+rocksdb::Status Stream::ClaimPelEntries(const Slice &stream_name, const 
std::string &group_name,
+                                        const std::string &consumer_name, 
const uint64_t min_idle_time_ms,
+                                        const std::vector<StreamEntryID> 
&entry_ids, const StreamClaimOptions &options,
+                                        StreamClaimResult *result) {
+  std::string ns_key = AppendNamespacePrefix(stream_name);
+  LockGuard guard(storage_->GetLockManager(), ns_key);
+  StreamMetadata metadata(false);
+  rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
+  if (!s.ok()) return s;
+
+  std::string group_key = internalKeyFromGroupName(ns_key, metadata, 
group_name);
+  std::string get_group_value;
+  s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key, 
&get_group_value);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+  if (s.IsNotFound()) {
+    return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group " 
+ group_name + " for key name " +
+                                            stream_name.ToString());
+  }
+  StreamConsumerGroupMetadata group_metadata = 
decodeStreamConsumerGroupMetadataValue(get_group_value);
+
+  std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, 
group_name, consumer_name);
+  std::string get_consumer_value;
+  s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, 
&get_consumer_value);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+  if (s.IsNotFound()) {
+    int created_number = 0;
+    s = createConsumerWithoutLock(stream_name, group_name, consumer_name, 
&created_number);
+    if (!s.ok()) {
+      return s;
+    }
+    group_metadata.consumer_number += created_number;
+  }
+  StreamConsumerMetadata consumer_metadata;
+  if (!s.IsNotFound()) {
+    consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
+  }
+  auto now = util::GetTimeStampMS();
+  consumer_metadata.last_idle_ms = now;
+  consumer_metadata.last_active_ms = now;
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisStream);
+  batch->PutLogData(log_data.Encode());
+
+  for (const auto &id : entry_ids) {
+    std::string raw_value;
+    rocksdb::Status s = getEntryRawValue(ns_key, metadata, id, &raw_value);
+    if (!s.ok() && !s.IsNotFound()) {
+      return s;
+    }
+    if (s.IsNotFound()) continue;
+
+    std::string entry_key = internalPelKeyFromGroupAndEntryId(ns_key, 
metadata, group_name, id);
+    std::string value;
+    s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, 
&value);
+    StreamPelEntry pel_entry;
+
+    if (!s.ok() && s.IsNotFound() && options.force) {
+      pel_entry = {0, 0, ""};
+      group_metadata.pending_number += 1;
+    }
+
+    if (s.ok()) {
+      pel_entry = decodeStreamPelEntryValue(value);
+    }
+
+    if (s.ok() || (s.IsNotFound() && options.force)) {
+      if (now - pel_entry.last_delivery_time_ms < min_idle_time_ms) continue;
+
+      std::vector<std::string> values;
+      if (options.just_id) {
+        result->ids.emplace_back(id.ToString());
+      } else {
+        auto rv = DecodeRawStreamEntryValue(raw_value, &values);
+        if (!rv.IsOK()) {
+          return rocksdb::Status::InvalidArgument(rv.Msg());
+        }
+        result->entries.emplace_back(id.ToString(), std::move(values));
+      }
+
+      if (pel_entry.consumer_name != "") {
+        std::string original_consumer_key =
+            internalKeyFromConsumerName(ns_key, metadata, group_name, 
pel_entry.consumer_name);
+        std::string get_original_consumer_value;
+        s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, 
original_consumer_key,
+                          &get_original_consumer_value);
+        if (!s.ok()) {
+          return s;
+        }
+        StreamConsumerMetadata original_consumer_metadata =
+            decodeStreamConsumerMetadataValue(get_original_consumer_value);
+        original_consumer_metadata.pending_number -= 1;
+        batch->Put(stream_cf_handle_, original_consumer_key,
+                   
encodeStreamConsumerMetadataValue(original_consumer_metadata));
+      }
+
+      pel_entry.consumer_name = consumer_name;
+      consumer_metadata.pending_number += 1;
+      if (options.with_time) {
+        pel_entry.last_delivery_time_ms = options.last_delivery_time_ms;
+      } else {
+        pel_entry.last_delivery_time_ms = now - options.idle_time_ms;
+      }
+
+      if (pel_entry.last_delivery_time_ms < 0 || 
pel_entry.last_delivery_time_ms > now) {
+        pel_entry.last_delivery_time_ms = now;
+      }
+
+      if (options.with_retry_count) {
+        pel_entry.last_delivery_count = options.last_delivery_count;
+      } else if (!options.just_id) {
+        pel_entry.last_delivery_count += 1;
+      }
+
+      std::string pel_value = encodeStreamPelEntryValue(pel_entry);
+      batch->Put(stream_cf_handle_, entry_key, pel_value);
+    }
+  }
+
+  if (options.with_last_id && options.last_delivered_id > 
group_metadata.last_delivered_id) {
+    group_metadata.last_delivered_id = options.last_delivered_id;
+  }
+
+  batch->Put(stream_cf_handle_, consumer_key, 
encodeStreamConsumerMetadataValue(consumer_metadata));
+  batch->Put(stream_cf_handle_, group_key, 
encodeStreamConsumerGroupMetadataValue(group_metadata));
+  return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+}
+
 rocksdb::Status Stream::CreateGroup(const Slice &stream_name, const 
StreamXGroupCreateOptions &options,
                                     const std::string &group_name) {
   if (std::isdigit(group_name[0])) {
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index 662f93f7..2e867a85 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -51,6 +51,10 @@ class Stream : public SubKeyScanner {
   rocksdb::Status DeleteEntries(const Slice &stream_name, const 
std::vector<StreamEntryID> &ids, uint64_t *deleted_cnt);
   rocksdb::Status DeletePelEntries(const Slice &stream_name, const std::string 
&group_name,
                                    const std::vector<StreamEntryID> 
&entry_ids, uint64_t *acknowledged);
+  rocksdb::Status ClaimPelEntries(const Slice &stream_name, const std::string 
&group_name,
+                                  const std::string &consumer_name, uint64_t 
min_idle_time_ms,
+                                  const std::vector<StreamEntryID> &entry_ids, 
const StreamClaimOptions &options,
+                                  StreamClaimResult *result);
   rocksdb::Status Len(const Slice &stream_name, const StreamLenOptions 
&options, uint64_t *size);
   rocksdb::Status GetStreamInfo(const Slice &stream_name, bool full, uint64_t 
count, StreamInfo *info);
   rocksdb::Status GetGroupInfo(const Slice &stream_name,
diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h
index ae90a3e5..efea48ec 100644
--- a/src/types/redis_stream_base.h
+++ b/src/types/redis_stream_base.h
@@ -161,6 +161,18 @@ struct StreamXGroupCreateOptions {
   std::string last_id;
 };
 
+struct StreamClaimOptions {
+  uint64_t idle_time_ms = 0;
+  bool with_time = false;
+  bool with_retry_count = false;
+  bool force = false;
+  bool just_id = false;
+  bool with_last_id = false;
+  uint64_t last_delivery_time_ms;
+  uint64_t last_delivery_count;
+  StreamEntryID last_delivered_id;
+};
+
 struct StreamConsumerGroupMetadata {
   uint64_t consumer_number = 0;
   uint64_t pending_number = 0;
@@ -207,6 +219,11 @@ struct StreamReadResult {
       : name(std::move(name)), entries(std::move(result)) {}
 };
 
+struct StreamClaimResult {
+  std::vector<std::string> ids;
+  std::vector<StreamEntry> entries;
+};
+
 Status IncrementStreamEntryID(StreamEntryID *id);
 Status ParseStreamEntryID(const std::string &input, StreamEntryID *id);
 StatusOr<std::unique_ptr<NextStreamEntryIDGenerationStrategy>> 
ParseNextStreamEntryIDStrategy(const std::string &input);
diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index ad6189db..d6a09a9e 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -1294,6 +1294,177 @@ func TestStreamOffset(t *testing.T) {
                require.NoError(t, err)
                require.Equal(t, int64(3), r)
        })
+
+       t.Run("Simple XCLAIM command tests", func(t *testing.T) {
+               streamName := "mystream"
+               groupName := "mygroup"
+               consumerName := "myconsumer"
+               consumer1Name := "myconsumer1"
+               require.NoError(t, rdb.Del(ctx, streamName).Err())
+
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "1-0",
+                       Values: []string{"field1", "data1"},
+               }).Err())
+               require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, 
"0").Err())
+               r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       Streams:  []string{streamName, ">"},
+                       Count:    1,
+                       NoAck:    false,
+               }).Result()
+               require.NoError(t, err)
+               require.Equal(t, []redis.XStream{{
+                       Stream:   streamName,
+                       Messages: []redis.XMessage{{ID: "1-0", Values: 
map[string]interface{}{"field1": "data1"}}},
+               }}, r)
+
+               claimedMessages, err := rdb.XClaim(ctx, &redis.XClaimArgs{
+                       Stream:   streamName,
+                       Group:    groupName,
+                       Consumer: consumer1Name,
+                       MinIdle:  0,
+                       Messages: []string{"1-0"},
+               }).Result()
+               require.NoError(t, err)
+               require.Len(t, claimedMessages, 1, "Expected to claim 1 
message")
+               require.Equal(t, "1-0", claimedMessages[0].ID, "Expected 
claimed message ID to match")
+
+               time.Sleep(2000 * time.Millisecond)
+               minIdleTime := 1000 * time.Millisecond
+               claimedMessages, err = rdb.XClaim(ctx, &redis.XClaimArgs{
+                       Stream:   streamName,
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       MinIdle:  minIdleTime,
+                       Messages: []string{"1-0"},
+               }).Result()
+               require.NoError(t, err)
+               require.Len(t, claimedMessages, 1, "Expected to claim 1 message 
if idle time is large enough")
+               require.Equal(t, "1-0", claimedMessages[0].ID, "Expected 
claimed message ID to match")
+
+               minIdleTime = 60000 * time.Millisecond
+               claimedMessages, err = rdb.XClaim(ctx, &redis.XClaimArgs{
+                       Stream:   streamName,
+                       Group:    groupName,
+                       Consumer: consumer1Name,
+                       MinIdle:  minIdleTime,
+                       Messages: []string{"1-0"},
+               }).Result()
+
+               require.NoError(t, err)
+               require.Empty(t, claimedMessages, "Expected no messages to be 
claimed due to insufficient idle time")
+       })
+
+       t.Run("XCLAIM with different timing situations and options", func(t 
*testing.T) {
+               streamName := "mystream"
+               groupName := "mygroup"
+               consumerName := "myconsumer"
+               consumer1Name := "myconsumer1"
+               require.NoError(t, rdb.Del(ctx, streamName).Err())
+
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "1-0",
+                       Values: []string{"field1", "data1"},
+               }).Err())
+               require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, 
"0").Err())
+               r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       Streams:  []string{streamName, ">"},
+                       Count:    1,
+                       NoAck:    false,
+               }).Result()
+               require.NoError(t, err)
+               require.Equal(t, []redis.XStream{{
+                       Stream:   streamName,
+                       Messages: []redis.XMessage{{ID: "1-0", Values: 
map[string]interface{}{"field1": "data1"}}},
+               }}, r)
+
+               rawClaimedMessages, err := rdb.Do(ctx, "XCLAIM", streamName, 
groupName, consumer1Name, "0", "1-0", "IDLE", "5000").Result()
+               require.NoError(t, err)
+               messages, ok := rawClaimedMessages.([]interface{})
+               require.True(t, ok, "Expected the result to be a slice of 
interface{}")
+               firstMsg, ok := messages[0].([]interface{})
+               require.True(t, ok, "Expected message details to be a slice of 
interface{}")
+               msgID, ok := firstMsg[0].(string)
+               require.True(t, ok, "Expected message ID to be a string")
+               require.Equal(t, "1-0", msgID, "Expected claimed message ID to 
match")
+
+               claimedMessages, err := rdb.XClaim(ctx, &redis.XClaimArgs{
+                       Stream:   streamName,
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       MinIdle:  2000 * time.Millisecond,
+                       Messages: []string{"1-0"},
+               }).Result()
+               require.NoError(t, err)
+               require.Len(t, claimedMessages, 1, "Expected to claim 1 message 
if idle time is large enough")
+               require.Equal(t, "1-0", claimedMessages[0].ID, "Expected 
claimed message ID to match")
+
+               tenSecondsAgo := time.Now().Add(-10 * time.Second).UnixMilli()
+               rawClaimedMessages, err = rdb.Do(ctx, "XCLAIM", streamName, 
groupName, consumer1Name, "0", "1-0", "TIME", tenSecondsAgo).Result()
+               require.NoError(t, err)
+               messages, ok = rawClaimedMessages.([]interface{})
+               require.True(t, ok, "Expected the result to be a slice of 
interface{}")
+               firstMsg, ok = messages[0].([]interface{})
+               require.True(t, ok, "Expected message details to be a slice of 
interface{}")
+               msgID, ok = firstMsg[0].(string)
+               require.True(t, ok, "Expected message ID to be a string")
+               require.Equal(t, "1-0", msgID, "Expected claimed message ID to 
match")
+
+               claimedMessages, err = rdb.XClaim(ctx, &redis.XClaimArgs{
+                       Stream:   streamName,
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       MinIdle:  5000 * time.Millisecond,
+                       Messages: []string{"1-0"},
+               }).Result()
+               require.NoError(t, err)
+               require.Len(t, claimedMessages, 1, "Expected to claim 1 message 
if idle time is large enough")
+               require.Equal(t, "1-0", claimedMessages[0].ID, "Expected 
claimed message ID to match")
+       })
+
+       t.Run("XCLAIM command with different options", func(t *testing.T) {
+               streamName := "mystream"
+               groupName := "mygroup"
+               consumerName := "myconsumer"
+               consumer1Name := "myconsumer1"
+
+               require.NoError(t, rdb.Del(ctx, streamName).Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "1-0",
+                       Values: []string{"field1", "data1"},
+               }).Err())
+               require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, 
"0").Err())
+
+               rawClaimedMessages, err := rdb.Do(ctx, "XCLAIM", streamName, 
groupName, consumerName, "0", "1-0", "FORCE").Result()
+               require.NoError(t, err)
+               messages, ok := rawClaimedMessages.([]interface{})
+               require.True(t, ok, "Expected the result to be a slice of 
interface{}")
+               firstMsg, ok := messages[0].([]interface{})
+               require.True(t, ok, "Expected message details to be a slice of 
interface{}")
+               msgID, ok := firstMsg[0].(string)
+               require.True(t, ok, "Expected message ID to be a string")
+               require.Equal(t, "1-0", msgID, "Expected claimed message ID to 
match")
+
+               cmd := rdb.XClaimJustID(ctx, &redis.XClaimArgs{
+                       Stream:   streamName,
+                       Group:    groupName,
+                       Consumer: consumer1Name,
+                       MinIdle:  0,
+                       Messages: []string{"1-0"},
+               })
+
+               claimedIDs, err := cmd.Result()
+               require.NoError(t, err)
+               require.Len(t, claimedIDs, 1, "Expected to claim exactly one 
message ID")
+               require.Equal(t, "1-0", claimedIDs[0], "Expected claimed 
message ID to match")
+       })
 }
 
 func parseStreamEntryID(id string) (ts int64, seqNum int64) {

Reply via email to