This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 79ad8a52 Add the support of the XREADGROUP command (#2120)
79ad8a52 is described below

commit 79ad8a52840af5fe6a046a19b7329a84855452a0
Author: Hauru <[email protected]>
AuthorDate: Fri Mar 1 16:19:14 2024 +0800

    Add the support of the XREADGROUP command (#2120)
---
 src/commands/cmd_stream.cc                   | 298 +++++++++++++++++++++++++++
 src/types/redis_stream.cc                    | 195 +++++++++++++++++-
 src/types/redis_stream.h                     |  10 +
 src/types/redis_stream_base.h                |   6 +
 tests/gocase/unit/type/stream/stream_test.go | 138 +++++++++++++
 5 files changed, 644 insertions(+), 3 deletions(-)

diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 76e2146b..04ea2942 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -1036,6 +1036,303 @@ class CommandXRead : public Commander,
   void unblockAll() { srv_->UnblockOnStreams(streams_, conn_); }
 };
 
+class CommandXReadGroup : public Commander,
+                          private EvbufCallbackBase<CommandXReadGroup, false>,
+                          private EventCallbackBase<CommandXReadGroup> {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    size_t streams_word_idx = 0;
+    if (util::ToLower(args[1]) != "group") {
+      return {Status::RedisParseErr, errInvalidSyntax};
+    }
+    group_name_ = args[2];
+    consumer_name_ = args[3];
+
+    for (size_t i = 4; i < args.size();) {
+      auto arg = util::ToLower(args[i]);
+
+      if (arg == "streams") {
+        streams_word_idx = i;
+        break;
+      }
+
+      if (arg == "count") {
+        if (i + 1 >= args.size()) {
+          return {Status::RedisParseErr, errInvalidSyntax};
+        }
+
+        with_count_ = true;
+
+        auto parse_result = ParseInt<uint64_t>(args[i + 1], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
+
+        count_ = *parse_result;
+        i += 2;
+        continue;
+      }
+
+      if (arg == "block") {
+        if (i + 1 >= args.size()) {
+          return {Status::RedisParseErr, errInvalidSyntax};
+        }
+
+        block_ = true;
+
+        auto parse_result = ParseInt<int64_t>(args[i + 1], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
+
+        if (*parse_result < 0) {
+          return {Status::RedisParseErr, errTimeoutIsNegative};
+        }
+
+        block_timeout_ = *parse_result;
+        i += 2;
+        continue;
+      }
+
+      if (arg == "noack") {
+        noack_ = true;
+      }
+
+      ++i;
+    }
+
+    if (streams_word_idx == 0) {
+      return {Status::RedisParseErr, errInvalidSyntax};
+    }
+
+    if ((args.size() - streams_word_idx - 1) % 2 != 0) {
+      return {Status::RedisParseErr, errUnbalancedStreamList};
+    }
+
+    size_t number_of_streams = (args.size() - streams_word_idx - 1) / 2;
+
+    for (size_t i = streams_word_idx + 1; i <= streams_word_idx + 
number_of_streams; ++i) {
+      streams_.push_back(args[i]);
+      const auto &id_str = args[i + number_of_streams];
+      bool get_latest = id_str == ">";
+      latest_marks_.push_back(get_latest);
+      if (!get_latest) {
+        block_ = false;
+      }
+      StreamEntryID id;
+      if (!get_latest) {
+        auto s = ParseStreamEntryID(id_str, &id);
+        if (!s.IsOK()) {
+          return s;
+        }
+      }
+      ids_.push_back(id);
+    }
+
+    return Status::OK();
+  }
+
+  Status Execute(Server *srv, Connection *conn, std::string *output) override {
+    redis::Stream stream_db(srv->storage, conn->GetNamespace());
+
+    std::vector<redis::StreamReadResult> results;
+
+    for (size_t i = 0; i < streams_.size(); ++i) {
+      redis::StreamRangeOptions options;
+      options.reverse = false;
+      options.start = ids_[i];
+      options.end = StreamEntryID{UINT64_MAX, UINT64_MAX};
+      options.with_count = with_count_;
+      options.count = count_;
+      options.exclude_start = true;
+      options.exclude_end = false;
+
+      std::vector<StreamEntry> result;
+      auto s = stream_db.RangeWithPending(streams_[i], options, &result, 
group_name_, consumer_name_, noack_,
+                                          latest_marks_[i]);
+      if (!s.ok() && !s.IsNotFound()) {
+        return {Status::RedisExecErr, s.ToString()};
+      }
+
+      if (result.size() > 0) {
+        results.emplace_back(streams_[i], result);
+      }
+    }
+
+    if (block_ && results.empty()) {
+      if (conn->IsInExec()) {
+        *output = redis::MultiLen(-1);
+        return Status::OK();  // No blocking in multi-exec
+      }
+
+      return BlockingRead(srv, conn, &stream_db);
+    }
+
+    if (!block_ && results.empty()) {
+      *output = redis::MultiLen(-1);
+      return Status::OK();
+    }
+
+    return SendResults(conn, output, results);
+  }
+
+  Status SendResults(Connection *conn, std::string *output, const 
std::vector<StreamReadResult> &results) {
+    output->append(redis::MultiLen(results.size()));
+    int id = 0;
+    for (const auto &result : results) {
+      output->append(redis::MultiLen(2));
+      output->append(redis::BulkString(result.name));
+      output->append(redis::MultiLen(result.entries.size()));
+      for (const auto &entry : result.entries) {
+        output->append(redis::MultiLen(2));
+        output->append(redis::BulkString(entry.key));
+        if (entry.values.size() == 0 && !latest_marks_[id]) {
+          output->append(conn->NilString());
+          continue;
+        }
+        output->append(conn->MultiBulkString(entry.values));
+      }
+      ++id;
+    }
+
+    return Status::OK();
+  }
+
+  Status BlockingRead(Server *srv, Connection *conn, redis::Stream *stream_db) 
{
+    if (!with_count_) {
+      with_count_ = true;
+      count_ = blocked_default_count_;
+    }
+
+    srv_ = srv;
+    conn_ = conn;
+
+    srv_->BlockOnStreams(streams_, ids_, conn_);
+
+    auto bev = conn->GetBufferEvent();
+    SetCB(bev);
+
+    if (block_timeout_ > 0) {
+      timer_.reset(NewTimer(bufferevent_get_base(bev)));
+      timeval tm;
+      if (block_timeout_ > 1000) {
+        tm.tv_sec = block_timeout_ / 1000;
+        tm.tv_usec = (block_timeout_ % 1000) * 1000;
+      } else {
+        tm.tv_sec = 0;
+        tm.tv_usec = block_timeout_ * 1000;
+      }
+
+      evtimer_add(timer_.get(), &tm);
+    }
+
+    return {Status::BlockingCmd};
+  }
+
+  void OnWrite(bufferevent *bev) {
+    if (timer_ != nullptr) {
+      timer_.reset();
+    }
+
+    unblockAll();
+    conn_->SetCB(bev);
+    bufferevent_enable(bev, EV_READ);
+
+    redis::Stream stream_db(srv_->storage, conn_->GetNamespace());
+
+    std::vector<StreamReadResult> results;
+
+    for (size_t i = 0; i < streams_.size(); ++i) {
+      redis::StreamRangeOptions options;
+      options.reverse = false;
+      options.start = ids_[i];
+      options.end = StreamEntryID{UINT64_MAX, UINT64_MAX};
+      options.with_count = with_count_;
+      options.count = count_;
+      options.exclude_start = true;
+      options.exclude_end = false;
+
+      std::vector<StreamEntry> result;
+      auto s = stream_db.RangeWithPending(streams_[i], options, &result, 
group_name_, consumer_name_, noack_,
+                                          latest_marks_[i]);
+      if (!s.ok() && !s.IsNotFound()) {
+        conn_->Reply(redis::Error("ERR " + s.ToString()));
+        return;
+      }
+
+      if (result.size() > 0) {
+        results.emplace_back(streams_[i], result);
+      }
+    }
+
+    if (results.empty()) {
+      conn_->Reply(redis::MultiLen(-1));
+    }
+
+    SendReply(results);
+  }
+
+  void SendReply(const std::vector<StreamReadResult> &results) {
+    std::string output;
+
+    output.append(redis::MultiLen(results.size()));
+
+    for (const auto &result : results) {
+      output.append(redis::MultiLen(2));
+      output.append(redis::BulkString(result.name));
+      output.append(redis::MultiLen(result.entries.size()));
+      for (const auto &entry : result.entries) {
+        output.append(redis::MultiLen(2));
+        output.append(redis::BulkString(entry.key));
+        output.append(conn_->MultiBulkString(entry.values));
+      }
+    }
+
+    conn_->Reply(output);
+  }
+
+  void OnEvent(bufferevent *bev, int16_t events) {
+    if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
+      if (timer_ != nullptr) {
+        timer_.reset();
+      }
+      unblockAll();
+    }
+    conn_->OnEvent(bev, events);
+  }
+
+  void TimerCB(int, int16_t events) {
+    conn_->Reply(conn_->NilString());
+
+    timer_.reset();
+
+    unblockAll();
+
+    auto bev = conn_->GetBufferEvent();
+    conn_->SetCB(bev);
+    bufferevent_enable(bev, EV_READ);
+  }
+
+ private:
+  std::vector<std::string> streams_;
+  std::vector<StreamEntryID> ids_;
+  std::vector<bool> latest_marks_;
+  std::string group_name_;
+  std::string consumer_name_;
+  bool noack_ = false;
+
+  Server *srv_ = nullptr;
+  Connection *conn_ = nullptr;
+  UniqueEvent timer_;
+  uint64_t count_ = 0;
+  int64_t block_timeout_ = 0;
+  int blocked_default_count_ = 1000;
+  bool with_count_ = false;
+  bool block_ = false;
+
+  void unblockAll() { srv_->UnblockOnStreams(streams_, conn_); }
+};
+
 class CommandXTrim : public Commander {
  public:
   Status Parse(const std::vector<std::string> &args) override {
@@ -1197,6 +1494,7 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandXAdd>("xadd", 
-5, "write", 1, 1, 1),
                         MakeCmdAttr<CommandXRange>("xrange", -4, "read-only", 
1, 1, 1),
                         MakeCmdAttr<CommandXRevRange>("xrevrange", -2, 
"read-only", 1, 1, 1),
                         MakeCmdAttr<CommandXRead>("xread", -4, "read-only", 0, 
0, 0),
+                        MakeCmdAttr<CommandXReadGroup>("xreadgroup", -7, 
"write", 0, 0, 0),
                         MakeCmdAttr<CommandXTrim>("xtrim", -4, "write 
no-dbsize-check", 1, 1, 1),
                         MakeCmdAttr<CommandXSetId>("xsetid", -3, "write", 1, 
1, 1))
 
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index b03dc378..0bebd28b 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -250,6 +250,50 @@ StreamConsumerMetadata 
Stream::decodeStreamConsumerMetadataValue(const std::stri
   return consumer_metadata;
 }
 
+std::string Stream::internalPelKeyFromGroupAndEntryId(const std::string 
&ns_key, const StreamMetadata &metadata,
+                                                      const std::string 
&group_name, const StreamEntryID &id) {
+  std::string sub_key;
+  PutFixed64(&sub_key, group_name.size());
+  sub_key += group_name;
+  PutFixed64(&sub_key, id.ms);
+  PutFixed64(&sub_key, id.seq);
+  std::string entry_key = InternalKey(ns_key, sub_key, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+  return entry_key;
+}
+
+StreamEntryID Stream::groupAndEntryIdFromPelInternalKey(rocksdb::Slice key, 
std::string &group_name) {
+  InternalKey ikey(key, storage_->IsSlotIdEncoded());
+  Slice subkey = ikey.GetSubKey();
+  uint64_t group_name_len = 0;
+  GetFixed64(&subkey, &group_name_len);
+  group_name = subkey.ToString().substr(0, group_name_len);
+  subkey.remove_prefix(group_name_len);
+  StreamEntryID entry_id;
+  GetFixed64(&subkey, &entry_id.ms);
+  GetFixed64(&subkey, &entry_id.seq);
+  return entry_id;
+}
+
+std::string Stream::encodeStreamPelEntryValue(const StreamPelEntry &pel_entry) 
{
+  std::string dst;
+  PutFixed64(&dst, pel_entry.last_delivery_time);
+  PutFixed64(&dst, pel_entry.last_delivery_count);
+  PutFixed64(&dst, pel_entry.consumer_name.size());
+  dst += pel_entry.consumer_name;
+  return dst;
+}
+
+StreamPelEntry Stream::decodeStreamPelEntryValue(const std::string &value) {
+  StreamPelEntry pel_entry;
+  rocksdb::Slice input(value);
+  GetFixed64(&input, &pel_entry.last_delivery_time);
+  GetFixed64(&input, &pel_entry.last_delivery_count);
+  uint64_t consumer_name_len = 0;
+  GetFixed64(&input, &consumer_name_len);
+  pel_entry.consumer_name = input.ToString().substr(0, consumer_name_len);
+  return pel_entry;
+}
+
 StreamSubkeyType Stream::identifySubkeyType(const rocksdb::Slice &key) const {
   InternalKey ikey(key, storage_->IsSlotIdEncoded());
   Slice subkey = ikey.GetSubKey();
@@ -372,13 +416,12 @@ rocksdb::Status Stream::DestroyGroup(const Slice 
&stream_name, const std::string
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
-rocksdb::Status Stream::CreateConsumer(const Slice &stream_name, const 
std::string &group_name,
-                                       const std::string &consumer_name, int 
*created_number) {
+rocksdb::Status Stream::createConsumerWithoutLock(const Slice &stream_name, 
const std::string &group_name,
+                                                  const std::string 
&consumer_name, int *created_number) {
   if (std::isdigit(consumer_name[0])) {
     return rocksdb::Status::InvalidArgument("consumer name cannot start with 
number");
   }
   std::string ns_key = AppendNamespacePrefix(stream_name);
-  LockGuard guard(storage_->GetLockManager(), ns_key);
   StreamMetadata metadata;
   rocksdb::Status s = GetMetadata(ns_key, &metadata);
   if (!s.ok() && !s.IsNotFound()) {
@@ -425,6 +468,13 @@ rocksdb::Status Stream::CreateConsumer(const Slice 
&stream_name, const std::stri
   return s;
 }
 
+rocksdb::Status Stream::CreateConsumer(const Slice &stream_name, const 
std::string &group_name,
+                                       const std::string &consumer_name, int 
*created_number) {
+  std::string ns_key = AppendNamespacePrefix(stream_name);
+  LockGuard guard(storage_->GetLockManager(), ns_key);
+  return createConsumerWithoutLock(stream_name, group_name, consumer_name, 
created_number);
+}
+
 rocksdb::Status Stream::GroupSetId(const Slice &stream_name, const std::string 
&group_name,
                                    const StreamXGroupCreateOptions &options) {
   std::string ns_key = AppendNamespacePrefix(stream_name);
@@ -927,6 +977,145 @@ rocksdb::Status Stream::Range(const Slice &stream_name, 
const StreamRangeOptions
   return range(ns_key, metadata, options, entries);
 }
 
+rocksdb::Status Stream::RangeWithPending(const Slice &stream_name, 
StreamRangeOptions &options,
+                                         std::vector<StreamEntry> *entries, 
std::string &group_name,
+                                         std::string &consumer_name, bool 
noack, bool latest) {
+  entries->clear();
+
+  if (options.with_count && options.count == 0) {
+    return rocksdb::Status::OK();
+  }
+
+  if (options.exclude_start && options.start.IsMaximum()) {
+    return rocksdb::Status::InvalidArgument("invalid start ID for the 
interval");
+  }
+
+  if (options.exclude_end && options.end.IsMinimum()) {
+    return rocksdb::Status::InvalidArgument("invalid end ID for the interval");
+  }
+
+  std::string ns_key = AppendNamespacePrefix(stream_name);
+  LockGuard guard(storage_->GetLockManager(), ns_key);
+
+  StreamMetadata metadata(false);
+  rocksdb::Status s = GetMetadata(ns_key, &metadata);
+  if (!s.ok()) {
+    return s.IsNotFound() ? rocksdb::Status::OK() : s;
+  }
+
+  std::string group_key = internalKeyFromGroupName(ns_key, metadata, 
group_name);
+  std::string get_group_value;
+  s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key, 
&get_group_value);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+  if (s.IsNotFound()) {
+    return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group " 
+ group_name + " for key name " +
+                                            stream_name.ToString());
+  }
+
+  std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, 
group_name, consumer_name);
+  std::string get_consumer_value;
+  s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, 
&get_consumer_value);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+  if (s.IsNotFound()) {
+    int created_number = 0;
+    s = createConsumerWithoutLock(stream_name, group_name, consumer_name, 
&created_number);
+    if (!s.ok()) {
+      return s;
+    }
+  }
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisStream);
+  batch->PutLogData(log_data.Encode());
+
+  StreamConsumerGroupMetadata consumergroup_metadata = 
decodeStreamConsumerGroupMetadataValue(get_group_value);
+  s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, 
&get_consumer_value);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+  StreamConsumerMetadata consumer_metadata = 
decodeStreamConsumerMetadataValue(get_consumer_value);
+  auto now = util::GetTimeStampMS();
+  consumer_metadata.last_idle = now;
+  consumer_metadata.last_active = now;
+
+  if (latest) {
+    options.start = consumergroup_metadata.last_delivered_id;
+    s = range(ns_key, metadata, options, entries);
+    if (!s.ok()) {
+      return s;
+    }
+    StreamEntryID maxid = {0, 0};
+    for (const auto &entry : *entries) {
+      StreamEntryID id;
+      Status st = ParseStreamEntryID(entry.key, &id);
+      if (!st.IsOK()) {
+        return rocksdb::Status::InvalidArgument(st.Msg());
+      }
+      if (id > maxid) {
+        maxid = id;
+      }
+      if (!noack) {
+        std::string pel_key = internalPelKeyFromGroupAndEntryId(ns_key, 
metadata, group_name, id);
+        StreamPelEntry pel_entry = {0, 0, consumer_name};
+        std::string pel_value = encodeStreamPelEntryValue(pel_entry);
+        batch->Put(stream_cf_handle_, pel_key, pel_value);
+        consumergroup_metadata.entries_read += 1;
+        consumergroup_metadata.pending_number += 1;
+        consumer_metadata.pending_number += 1;
+      }
+    }
+    if (maxid > consumergroup_metadata.last_delivered_id) {
+      consumergroup_metadata.last_delivered_id = maxid;
+    }
+  } else {
+    std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, 
metadata, group_name, options.start);
+    std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, 
group_name, StreamEntryID::Maximum());
+
+    rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
+    LatestSnapShot ss(storage_);
+    read_options.snapshot = ss.GetSnapShot();
+    rocksdb::Slice upper_bound(end_key);
+    read_options.iterate_upper_bound = &upper_bound;
+    rocksdb::Slice lower_bound(prefix_key);
+    read_options.iterate_lower_bound = &lower_bound;
+
+    auto iter = util::UniqueIterator(storage_, read_options, 
stream_cf_handle_);
+    uint64_t count = 0;
+    for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+      if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) 
{
+        std::string tmp_group_name;
+        StreamEntryID entry_id = 
groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
+        if (tmp_group_name != group_name) continue;
+        StreamPelEntry pel_entry = 
decodeStreamPelEntryValue(iter->value().ToString());
+        if (pel_entry.consumer_name != consumer_name) continue;
+        std::string raw_value;
+        rocksdb::Status st = getEntryRawValue(ns_key, metadata, entry_id, 
&raw_value);
+        if (!st.ok() && !st.IsNotFound()) {
+          return st;
+        }
+        std::vector<std::string> values;
+        auto rv = DecodeRawStreamEntryValue(raw_value, &values);
+        if (!rv.IsOK()) {
+          return rocksdb::Status::InvalidArgument(rv.Msg());
+        }
+        entries->emplace_back(entry_id.ToString(), std::move(values));
+        pel_entry.last_delivery_count += 1;
+        pel_entry.last_delivery_time = now;
+        batch->Put(stream_cf_handle_, iter->key(), 
encodeStreamPelEntryValue(pel_entry));
+        ++count;
+        if (count >= options.count) break;
+      }
+    }
+  }
+  batch->Put(stream_cf_handle_, group_key, 
encodeStreamConsumerGroupMetadataValue(consumergroup_metadata));
+  batch->Put(stream_cf_handle_, consumer_key, 
encodeStreamConsumerMetadataValue(consumer_metadata));
+  return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+}
+
 rocksdb::Status Stream::Trim(const Slice &stream_name, const StreamTrimOptions 
&options, uint64_t *delete_cnt) {
   *delete_cnt = 0;
 
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index 8f6367a7..4381e4ef 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -54,6 +54,9 @@ class Stream : public SubKeyScanner {
   rocksdb::Status GetConsumerInfo(const Slice &stream_name, const std::string 
&group_name,
                                   std::vector<std::pair<std::string, 
StreamConsumerMetadata>> &consumer_metadata);
   rocksdb::Status Range(const Slice &stream_name, const StreamRangeOptions 
&options, std::vector<StreamEntry> *entries);
+  rocksdb::Status RangeWithPending(const Slice &stream_name, 
StreamRangeOptions &options,
+                                   std::vector<StreamEntry> *entries, 
std::string &group_name,
+                                   std::string &consumer_name, bool noack, 
bool latest);
   rocksdb::Status Trim(const Slice &stream_name, const StreamTrimOptions 
&options, uint64_t *delete_cnt);
   rocksdb::Status GetMetadata(const Slice &stream_name, StreamMetadata 
*metadata);
   rocksdb::Status GetLastGeneratedID(const Slice &stream_name, StreamEntryID 
*id);
@@ -82,6 +85,13 @@ class Stream : public SubKeyScanner {
   std::string consumerNameFromInternalKey(rocksdb::Slice key) const;
   static std::string encodeStreamConsumerMetadataValue(const 
StreamConsumerMetadata &consumer_metadata);
   static StreamConsumerMetadata decodeStreamConsumerMetadataValue(const 
std::string &value);
+  rocksdb::Status createConsumerWithoutLock(const Slice &stream_name, const 
std::string &group_name,
+                                            const std::string &consumer_name, 
int *created_number);
+  std::string internalPelKeyFromGroupAndEntryId(const std::string &ns_key, 
const StreamMetadata &metadata,
+                                                const std::string &group_name, 
const StreamEntryID &id);
+  StreamEntryID groupAndEntryIdFromPelInternalKey(rocksdb::Slice key, 
std::string &group_name);
+  static std::string encodeStreamPelEntryValue(const StreamPelEntry 
&pel_entry);
+  static StreamPelEntry decodeStreamPelEntryValue(const std::string &value);
   StreamSubkeyType identifySubkeyType(const rocksdb::Slice &key) const;
 };
 
diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h
index 7ff62e6e..60d54d23 100644
--- a/src/types/redis_stream_base.h
+++ b/src/types/redis_stream_base.h
@@ -182,6 +182,12 @@ enum class StreamSubkeyType {
   StreamPelEntry = 3,
 };
 
+struct StreamPelEntry {
+  uint64_t last_delivery_time;
+  uint64_t last_delivery_count;
+  std::string consumer_name;
+};
+
 struct StreamInfo {
   uint64_t size;
   uint64_t entries_added;
diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index 7bbce02b..d74ff2b0 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -1000,6 +1000,144 @@ func TestStreamOffset(t *testing.T) {
                        Streams: []string{streamName, "0"},
                }).Err())
        })
+
+       t.Run("XREADGROUP with different kinds of commands", func(t *testing.T) 
{
+               streamName := "mystream"
+               groupName := "mygroup"
+               require.NoError(t, rdb.Del(ctx, streamName).Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "1-0",
+                       Values: []string{"field1", "data1"},
+               }).Err())
+               require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, 
"0").Err())
+               consumerName := "myconsumer"
+               r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       Streams:  []string{streamName, ">"},
+                       Count:    1,
+                       NoAck:    false,
+               }).Result()
+               require.NoError(t, err)
+               require.Equal(t, []redis.XStream{{
+                       Stream:   streamName,
+                       Messages: []redis.XMessage{{ID: "1-0", Values: 
map[string]interface{}{"field1": "data1"}}},
+               }}, r)
+
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "2-0",
+                       Values: []string{"field2", "data2"},
+               }).Err())
+               r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       Streams:  []string{streamName, ">"},
+                       Count:    1,
+                       NoAck:    false,
+               }).Result()
+               require.NoError(t, err)
+               require.Equal(t, []redis.XStream{{
+                       Stream:   streamName,
+                       Messages: []redis.XMessage{{ID: "2-0", Values: 
map[string]interface{}{"field2": "data2"}}},
+               }}, r)
+
+               r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       Streams:  []string{streamName, "0"},
+                       Count:    2,
+                       NoAck:    false,
+               }).Result()
+               require.NoError(t, err)
+               require.Equal(t, []redis.XStream{{
+                       Stream: streamName,
+                       Messages: []redis.XMessage{{ID: "1-0", Values: 
map[string]interface{}{"field1": "data1"}},
+                               {ID: "2-0", Values: 
map[string]interface{}{"field2": "data2"}}},
+               }}, r)
+
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "3-0",
+                       Values: []string{"field3", "data3"},
+               }).Err())
+               r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       Streams:  []string{streamName, ">"},
+                       Count:    1,
+                       NoAck:    true,
+               }).Result()
+               require.NoError(t, err)
+               require.Equal(t, []redis.XStream{{
+                       Stream:   streamName,
+                       Messages: []redis.XMessage{{ID: "3-0", Values: 
map[string]interface{}{"field3": "data3"}}},
+               }}, r)
+               r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       Streams:  []string{streamName, "0"},
+                       Count:    2,
+                       NoAck:    false,
+               }).Result()
+               require.NoError(t, err)
+               require.Equal(t, []redis.XStream{{
+                       Stream: streamName,
+                       Messages: []redis.XMessage{{ID: "1-0", Values: 
map[string]interface{}{"field1": "data1"}},
+                               {ID: "2-0", Values: 
map[string]interface{}{"field2": "data2"}}},
+               }}, r)
+
+               c := srv.NewClient()
+               defer func() { require.NoError(t, c.Close()) }()
+               ch := make(chan []redis.XStream)
+               go func() {
+                       ch <- c.XReadGroup(ctx, &redis.XReadGroupArgs{
+                               Group:    groupName,
+                               Consumer: consumerName,
+                               Streams:  []string{streamName, ">"},
+                               Count:    2,
+                               Block:    10 * time.Second,
+                               NoAck:    false,
+                       }).Val()
+               }()
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "4-0",
+                       Values: []string{"field4", "data4"},
+               }).Err())
+               r = <-ch
+               require.Equal(t, []redis.XStream{{
+                       Stream:   streamName,
+                       Messages: []redis.XMessage{{ID: "4-0", Values: 
map[string]interface{}{"field4": "data4"}}},
+               }}, r)
+
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "5-0",
+                       Values: []string{"field5", "data5"},
+               }).Err())
+               require.NoError(t, rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       Streams:  []string{streamName, ">"},
+                       Count:    1,
+                       NoAck:    false,
+               }).Err())
+               require.NoError(t, rdb.XDel(ctx, streamName, "5-0").Err())
+               r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       Streams:  []string{streamName, "5"},
+                       Count:    1,
+                       NoAck:    false,
+               }).Result()
+               require.NoError(t, err)
+               require.Equal(t, []redis.XStream{{
+                       Stream:   streamName,
+                       Messages: []redis.XMessage{{ID: "5-0", Values: 
map[string]interface{}(nil)}},
+               }}, r)
+       })
 }
 
 func parseStreamEntryID(id string) (ts int64, seqNum int64) {

Reply via email to