This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 79ad8a52 Add the support of the XREADGROUP command (#2120)
79ad8a52 is described below
commit 79ad8a52840af5fe6a046a19b7329a84855452a0
Author: Hauru <[email protected]>
AuthorDate: Fri Mar 1 16:19:14 2024 +0800
Add the support of the XREADGROUP command (#2120)
---
src/commands/cmd_stream.cc | 298 +++++++++++++++++++++++++++
src/types/redis_stream.cc | 195 +++++++++++++++++-
src/types/redis_stream.h | 10 +
src/types/redis_stream_base.h | 6 +
tests/gocase/unit/type/stream/stream_test.go | 138 +++++++++++++
5 files changed, 644 insertions(+), 3 deletions(-)
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 76e2146b..04ea2942 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -1036,6 +1036,303 @@ class CommandXRead : public Commander,
void unblockAll() { srv_->UnblockOnStreams(streams_, conn_); }
};
+class CommandXReadGroup : public Commander,
+ private EvbufCallbackBase<CommandXReadGroup, false>,
+ private EventCallbackBase<CommandXReadGroup> {
+ public:
+ Status Parse(const std::vector<std::string> &args) override {
+ size_t streams_word_idx = 0;
+ if (util::ToLower(args[1]) != "group") {
+ return {Status::RedisParseErr, errInvalidSyntax};
+ }
+ group_name_ = args[2];
+ consumer_name_ = args[3];
+
+ for (size_t i = 4; i < args.size();) {
+ auto arg = util::ToLower(args[i]);
+
+ if (arg == "streams") {
+ streams_word_idx = i;
+ break;
+ }
+
+ if (arg == "count") {
+ if (i + 1 >= args.size()) {
+ return {Status::RedisParseErr, errInvalidSyntax};
+ }
+
+ with_count_ = true;
+
+ auto parse_result = ParseInt<uint64_t>(args[i + 1], 10);
+ if (!parse_result) {
+ return {Status::RedisParseErr, errValueNotInteger};
+ }
+
+ count_ = *parse_result;
+ i += 2;
+ continue;
+ }
+
+ if (arg == "block") {
+ if (i + 1 >= args.size()) {
+ return {Status::RedisParseErr, errInvalidSyntax};
+ }
+
+ block_ = true;
+
+ auto parse_result = ParseInt<int64_t>(args[i + 1], 10);
+ if (!parse_result) {
+ return {Status::RedisParseErr, errValueNotInteger};
+ }
+
+ if (*parse_result < 0) {
+ return {Status::RedisParseErr, errTimeoutIsNegative};
+ }
+
+ block_timeout_ = *parse_result;
+ i += 2;
+ continue;
+ }
+
+ if (arg == "noack") {
+ noack_ = true;
+ }
+
+ ++i;
+ }
+
+ if (streams_word_idx == 0) {
+ return {Status::RedisParseErr, errInvalidSyntax};
+ }
+
+ if ((args.size() - streams_word_idx - 1) % 2 != 0) {
+ return {Status::RedisParseErr, errUnbalancedStreamList};
+ }
+
+ size_t number_of_streams = (args.size() - streams_word_idx - 1) / 2;
+
+ for (size_t i = streams_word_idx + 1; i <= streams_word_idx +
number_of_streams; ++i) {
+ streams_.push_back(args[i]);
+ const auto &id_str = args[i + number_of_streams];
+ bool get_latest = id_str == ">";
+ latest_marks_.push_back(get_latest);
+ if (!get_latest) {
+ block_ = false;
+ }
+ StreamEntryID id;
+ if (!get_latest) {
+ auto s = ParseStreamEntryID(id_str, &id);
+ if (!s.IsOK()) {
+ return s;
+ }
+ }
+ ids_.push_back(id);
+ }
+
+ return Status::OK();
+ }
+
+ Status Execute(Server *srv, Connection *conn, std::string *output) override {
+ redis::Stream stream_db(srv->storage, conn->GetNamespace());
+
+ std::vector<redis::StreamReadResult> results;
+
+ for (size_t i = 0; i < streams_.size(); ++i) {
+ redis::StreamRangeOptions options;
+ options.reverse = false;
+ options.start = ids_[i];
+ options.end = StreamEntryID{UINT64_MAX, UINT64_MAX};
+ options.with_count = with_count_;
+ options.count = count_;
+ options.exclude_start = true;
+ options.exclude_end = false;
+
+ std::vector<StreamEntry> result;
+ auto s = stream_db.RangeWithPending(streams_[i], options, &result,
group_name_, consumer_name_, noack_,
+ latest_marks_[i]);
+ if (!s.ok() && !s.IsNotFound()) {
+ return {Status::RedisExecErr, s.ToString()};
+ }
+
+ if (result.size() > 0) {
+ results.emplace_back(streams_[i], result);
+ }
+ }
+
+ if (block_ && results.empty()) {
+ if (conn->IsInExec()) {
+ *output = redis::MultiLen(-1);
+ return Status::OK(); // No blocking in multi-exec
+ }
+
+ return BlockingRead(srv, conn, &stream_db);
+ }
+
+ if (!block_ && results.empty()) {
+ *output = redis::MultiLen(-1);
+ return Status::OK();
+ }
+
+ return SendResults(conn, output, results);
+ }
+
+ Status SendResults(Connection *conn, std::string *output, const
std::vector<StreamReadResult> &results) {
+ output->append(redis::MultiLen(results.size()));
+ int id = 0;
+ for (const auto &result : results) {
+ output->append(redis::MultiLen(2));
+ output->append(redis::BulkString(result.name));
+ output->append(redis::MultiLen(result.entries.size()));
+ for (const auto &entry : result.entries) {
+ output->append(redis::MultiLen(2));
+ output->append(redis::BulkString(entry.key));
+ if (entry.values.size() == 0 && !latest_marks_[id]) {
+ output->append(conn->NilString());
+ continue;
+ }
+ output->append(conn->MultiBulkString(entry.values));
+ }
+ ++id;
+ }
+
+ return Status::OK();
+ }
+
+ Status BlockingRead(Server *srv, Connection *conn, redis::Stream *stream_db)
{
+ if (!with_count_) {
+ with_count_ = true;
+ count_ = blocked_default_count_;
+ }
+
+ srv_ = srv;
+ conn_ = conn;
+
+ srv_->BlockOnStreams(streams_, ids_, conn_);
+
+ auto bev = conn->GetBufferEvent();
+ SetCB(bev);
+
+ if (block_timeout_ > 0) {
+ timer_.reset(NewTimer(bufferevent_get_base(bev)));
+ timeval tm;
+ if (block_timeout_ > 1000) {
+ tm.tv_sec = block_timeout_ / 1000;
+ tm.tv_usec = (block_timeout_ % 1000) * 1000;
+ } else {
+ tm.tv_sec = 0;
+ tm.tv_usec = block_timeout_ * 1000;
+ }
+
+ evtimer_add(timer_.get(), &tm);
+ }
+
+ return {Status::BlockingCmd};
+ }
+
+ void OnWrite(bufferevent *bev) {
+ if (timer_ != nullptr) {
+ timer_.reset();
+ }
+
+ unblockAll();
+ conn_->SetCB(bev);
+ bufferevent_enable(bev, EV_READ);
+
+ redis::Stream stream_db(srv_->storage, conn_->GetNamespace());
+
+ std::vector<StreamReadResult> results;
+
+ for (size_t i = 0; i < streams_.size(); ++i) {
+ redis::StreamRangeOptions options;
+ options.reverse = false;
+ options.start = ids_[i];
+ options.end = StreamEntryID{UINT64_MAX, UINT64_MAX};
+ options.with_count = with_count_;
+ options.count = count_;
+ options.exclude_start = true;
+ options.exclude_end = false;
+
+ std::vector<StreamEntry> result;
+ auto s = stream_db.RangeWithPending(streams_[i], options, &result,
group_name_, consumer_name_, noack_,
+ latest_marks_[i]);
+ if (!s.ok() && !s.IsNotFound()) {
+ conn_->Reply(redis::Error("ERR " + s.ToString()));
+ return;
+ }
+
+ if (result.size() > 0) {
+ results.emplace_back(streams_[i], result);
+ }
+ }
+
+ if (results.empty()) {
+ conn_->Reply(redis::MultiLen(-1));
+ }
+
+ SendReply(results);
+ }
+
+ void SendReply(const std::vector<StreamReadResult> &results) {
+ std::string output;
+
+ output.append(redis::MultiLen(results.size()));
+
+ for (const auto &result : results) {
+ output.append(redis::MultiLen(2));
+ output.append(redis::BulkString(result.name));
+ output.append(redis::MultiLen(result.entries.size()));
+ for (const auto &entry : result.entries) {
+ output.append(redis::MultiLen(2));
+ output.append(redis::BulkString(entry.key));
+ output.append(conn_->MultiBulkString(entry.values));
+ }
+ }
+
+ conn_->Reply(output);
+ }
+
+ void OnEvent(bufferevent *bev, int16_t events) {
+ if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
+ if (timer_ != nullptr) {
+ timer_.reset();
+ }
+ unblockAll();
+ }
+ conn_->OnEvent(bev, events);
+ }
+
+ void TimerCB(int, int16_t events) {
+ conn_->Reply(conn_->NilString());
+
+ timer_.reset();
+
+ unblockAll();
+
+ auto bev = conn_->GetBufferEvent();
+ conn_->SetCB(bev);
+ bufferevent_enable(bev, EV_READ);
+ }
+
+ private:
+ std::vector<std::string> streams_;
+ std::vector<StreamEntryID> ids_;
+ std::vector<bool> latest_marks_;
+ std::string group_name_;
+ std::string consumer_name_;
+ bool noack_ = false;
+
+ Server *srv_ = nullptr;
+ Connection *conn_ = nullptr;
+ UniqueEvent timer_;
+ uint64_t count_ = 0;
+ int64_t block_timeout_ = 0;
+ int blocked_default_count_ = 1000;
+ bool with_count_ = false;
+ bool block_ = false;
+
+ void unblockAll() { srv_->UnblockOnStreams(streams_, conn_); }
+};
+
class CommandXTrim : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
@@ -1197,6 +1494,7 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandXAdd>("xadd",
-5, "write", 1, 1, 1),
MakeCmdAttr<CommandXRange>("xrange", -4, "read-only",
1, 1, 1),
MakeCmdAttr<CommandXRevRange>("xrevrange", -2,
"read-only", 1, 1, 1),
MakeCmdAttr<CommandXRead>("xread", -4, "read-only", 0,
0, 0),
+ MakeCmdAttr<CommandXReadGroup>("xreadgroup", -7,
"write", 0, 0, 0),
MakeCmdAttr<CommandXTrim>("xtrim", -4, "write
no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXSetId>("xsetid", -3, "write", 1,
1, 1))
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index b03dc378..0bebd28b 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -250,6 +250,50 @@ StreamConsumerMetadata
Stream::decodeStreamConsumerMetadataValue(const std::stri
return consumer_metadata;
}
+std::string Stream::internalPelKeyFromGroupAndEntryId(const std::string
&ns_key, const StreamMetadata &metadata,
+ const std::string
&group_name, const StreamEntryID &id) {
+ std::string sub_key;
+ PutFixed64(&sub_key, group_name.size());
+ sub_key += group_name;
+ PutFixed64(&sub_key, id.ms);
+ PutFixed64(&sub_key, id.seq);
+ std::string entry_key = InternalKey(ns_key, sub_key, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+ return entry_key;
+}
+
+StreamEntryID Stream::groupAndEntryIdFromPelInternalKey(rocksdb::Slice key,
std::string &group_name) {
+ InternalKey ikey(key, storage_->IsSlotIdEncoded());
+ Slice subkey = ikey.GetSubKey();
+ uint64_t group_name_len = 0;
+ GetFixed64(&subkey, &group_name_len);
+ group_name = subkey.ToString().substr(0, group_name_len);
+ subkey.remove_prefix(group_name_len);
+ StreamEntryID entry_id;
+ GetFixed64(&subkey, &entry_id.ms);
+ GetFixed64(&subkey, &entry_id.seq);
+ return entry_id;
+}
+
+std::string Stream::encodeStreamPelEntryValue(const StreamPelEntry &pel_entry)
{
+ std::string dst;
+ PutFixed64(&dst, pel_entry.last_delivery_time);
+ PutFixed64(&dst, pel_entry.last_delivery_count);
+ PutFixed64(&dst, pel_entry.consumer_name.size());
+ dst += pel_entry.consumer_name;
+ return dst;
+}
+
+StreamPelEntry Stream::decodeStreamPelEntryValue(const std::string &value) {
+ StreamPelEntry pel_entry;
+ rocksdb::Slice input(value);
+ GetFixed64(&input, &pel_entry.last_delivery_time);
+ GetFixed64(&input, &pel_entry.last_delivery_count);
+ uint64_t consumer_name_len = 0;
+ GetFixed64(&input, &consumer_name_len);
+ pel_entry.consumer_name = input.ToString().substr(0, consumer_name_len);
+ return pel_entry;
+}
+
StreamSubkeyType Stream::identifySubkeyType(const rocksdb::Slice &key) const {
InternalKey ikey(key, storage_->IsSlotIdEncoded());
Slice subkey = ikey.GetSubKey();
@@ -372,13 +416,12 @@ rocksdb::Status Stream::DestroyGroup(const Slice
&stream_name, const std::string
return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
-rocksdb::Status Stream::CreateConsumer(const Slice &stream_name, const
std::string &group_name,
- const std::string &consumer_name, int
*created_number) {
+rocksdb::Status Stream::createConsumerWithoutLock(const Slice &stream_name,
const std::string &group_name,
+ const std::string
&consumer_name, int *created_number) {
if (std::isdigit(consumer_name[0])) {
return rocksdb::Status::InvalidArgument("consumer name cannot start with
number");
}
std::string ns_key = AppendNamespacePrefix(stream_name);
- LockGuard guard(storage_->GetLockManager(), ns_key);
StreamMetadata metadata;
rocksdb::Status s = GetMetadata(ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) {
@@ -425,6 +468,13 @@ rocksdb::Status Stream::CreateConsumer(const Slice
&stream_name, const std::stri
return s;
}
+rocksdb::Status Stream::CreateConsumer(const Slice &stream_name, const
std::string &group_name,
+ const std::string &consumer_name, int
*created_number) {
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+ LockGuard guard(storage_->GetLockManager(), ns_key);
+ return createConsumerWithoutLock(stream_name, group_name, consumer_name,
created_number);
+}
+
rocksdb::Status Stream::GroupSetId(const Slice &stream_name, const std::string
&group_name,
const StreamXGroupCreateOptions &options) {
std::string ns_key = AppendNamespacePrefix(stream_name);
@@ -927,6 +977,145 @@ rocksdb::Status Stream::Range(const Slice &stream_name,
const StreamRangeOptions
return range(ns_key, metadata, options, entries);
}
+rocksdb::Status Stream::RangeWithPending(const Slice &stream_name,
StreamRangeOptions &options,
+ std::vector<StreamEntry> *entries,
std::string &group_name,
+ std::string &consumer_name, bool
noack, bool latest) {
+ entries->clear();
+
+ if (options.with_count && options.count == 0) {
+ return rocksdb::Status::OK();
+ }
+
+ if (options.exclude_start && options.start.IsMaximum()) {
+ return rocksdb::Status::InvalidArgument("invalid start ID for the
interval");
+ }
+
+ if (options.exclude_end && options.end.IsMinimum()) {
+ return rocksdb::Status::InvalidArgument("invalid end ID for the interval");
+ }
+
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+ LockGuard guard(storage_->GetLockManager(), ns_key);
+
+ StreamMetadata metadata(false);
+ rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ if (!s.ok()) {
+ return s.IsNotFound() ? rocksdb::Status::OK() : s;
+ }
+
+ std::string group_key = internalKeyFromGroupName(ns_key, metadata,
group_name);
+ std::string get_group_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key,
&get_group_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group "
+ group_name + " for key name " +
+ stream_name.ToString());
+ }
+
+ std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata,
group_name, consumer_name);
+ std::string get_consumer_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key,
&get_consumer_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ int created_number = 0;
+ s = createConsumerWithoutLock(stream_name, group_name, consumer_name,
&created_number);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisStream);
+ batch->PutLogData(log_data.Encode());
+
+ StreamConsumerGroupMetadata consumergroup_metadata =
decodeStreamConsumerGroupMetadataValue(get_group_value);
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key,
&get_consumer_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ StreamConsumerMetadata consumer_metadata =
decodeStreamConsumerMetadataValue(get_consumer_value);
+ auto now = util::GetTimeStampMS();
+ consumer_metadata.last_idle = now;
+ consumer_metadata.last_active = now;
+
+ if (latest) {
+ options.start = consumergroup_metadata.last_delivered_id;
+ s = range(ns_key, metadata, options, entries);
+ if (!s.ok()) {
+ return s;
+ }
+ StreamEntryID maxid = {0, 0};
+ for (const auto &entry : *entries) {
+ StreamEntryID id;
+ Status st = ParseStreamEntryID(entry.key, &id);
+ if (!st.IsOK()) {
+ return rocksdb::Status::InvalidArgument(st.Msg());
+ }
+ if (id > maxid) {
+ maxid = id;
+ }
+ if (!noack) {
+ std::string pel_key = internalPelKeyFromGroupAndEntryId(ns_key,
metadata, group_name, id);
+ StreamPelEntry pel_entry = {0, 0, consumer_name};
+ std::string pel_value = encodeStreamPelEntryValue(pel_entry);
+ batch->Put(stream_cf_handle_, pel_key, pel_value);
+ consumergroup_metadata.entries_read += 1;
+ consumergroup_metadata.pending_number += 1;
+ consumer_metadata.pending_number += 1;
+ }
+ }
+ if (maxid > consumergroup_metadata.last_delivered_id) {
+ consumergroup_metadata.last_delivered_id = maxid;
+ }
+ } else {
+ std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key,
metadata, group_name, options.start);
+ std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata,
group_name, StreamEntryID::Maximum());
+
+ rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
+ LatestSnapShot ss(storage_);
+ read_options.snapshot = ss.GetSnapShot();
+ rocksdb::Slice upper_bound(end_key);
+ read_options.iterate_upper_bound = &upper_bound;
+ rocksdb::Slice lower_bound(prefix_key);
+ read_options.iterate_lower_bound = &lower_bound;
+
+ auto iter = util::UniqueIterator(storage_, read_options,
stream_cf_handle_);
+ uint64_t count = 0;
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry)
{
+ std::string tmp_group_name;
+ StreamEntryID entry_id =
groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
+ if (tmp_group_name != group_name) continue;
+ StreamPelEntry pel_entry =
decodeStreamPelEntryValue(iter->value().ToString());
+ if (pel_entry.consumer_name != consumer_name) continue;
+ std::string raw_value;
+ rocksdb::Status st = getEntryRawValue(ns_key, metadata, entry_id,
&raw_value);
+ if (!st.ok() && !st.IsNotFound()) {
+ return st;
+ }
+ std::vector<std::string> values;
+ auto rv = DecodeRawStreamEntryValue(raw_value, &values);
+ if (!rv.IsOK()) {
+ return rocksdb::Status::InvalidArgument(rv.Msg());
+ }
+ entries->emplace_back(entry_id.ToString(), std::move(values));
+ pel_entry.last_delivery_count += 1;
+ pel_entry.last_delivery_time = now;
+ batch->Put(stream_cf_handle_, iter->key(),
encodeStreamPelEntryValue(pel_entry));
+ ++count;
+ if (count >= options.count) break;
+ }
+ }
+ }
+ batch->Put(stream_cf_handle_, group_key,
encodeStreamConsumerGroupMetadataValue(consumergroup_metadata));
+ batch->Put(stream_cf_handle_, consumer_key,
encodeStreamConsumerMetadataValue(consumer_metadata));
+ return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+}
+
rocksdb::Status Stream::Trim(const Slice &stream_name, const StreamTrimOptions
&options, uint64_t *delete_cnt) {
*delete_cnt = 0;
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index 8f6367a7..4381e4ef 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -54,6 +54,9 @@ class Stream : public SubKeyScanner {
rocksdb::Status GetConsumerInfo(const Slice &stream_name, const std::string
&group_name,
std::vector<std::pair<std::string,
StreamConsumerMetadata>> &consumer_metadata);
rocksdb::Status Range(const Slice &stream_name, const StreamRangeOptions
&options, std::vector<StreamEntry> *entries);
+ rocksdb::Status RangeWithPending(const Slice &stream_name,
StreamRangeOptions &options,
+ std::vector<StreamEntry> *entries,
std::string &group_name,
+ std::string &consumer_name, bool noack,
bool latest);
rocksdb::Status Trim(const Slice &stream_name, const StreamTrimOptions
&options, uint64_t *delete_cnt);
rocksdb::Status GetMetadata(const Slice &stream_name, StreamMetadata
*metadata);
rocksdb::Status GetLastGeneratedID(const Slice &stream_name, StreamEntryID
*id);
@@ -82,6 +85,13 @@ class Stream : public SubKeyScanner {
std::string consumerNameFromInternalKey(rocksdb::Slice key) const;
static std::string encodeStreamConsumerMetadataValue(const
StreamConsumerMetadata &consumer_metadata);
static StreamConsumerMetadata decodeStreamConsumerMetadataValue(const
std::string &value);
+ rocksdb::Status createConsumerWithoutLock(const Slice &stream_name, const
std::string &group_name,
+ const std::string &consumer_name,
int *created_number);
+ std::string internalPelKeyFromGroupAndEntryId(const std::string &ns_key,
const StreamMetadata &metadata,
+ const std::string &group_name,
const StreamEntryID &id);
+ StreamEntryID groupAndEntryIdFromPelInternalKey(rocksdb::Slice key,
std::string &group_name);
+ static std::string encodeStreamPelEntryValue(const StreamPelEntry
&pel_entry);
+ static StreamPelEntry decodeStreamPelEntryValue(const std::string &value);
StreamSubkeyType identifySubkeyType(const rocksdb::Slice &key) const;
};
diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h
index 7ff62e6e..60d54d23 100644
--- a/src/types/redis_stream_base.h
+++ b/src/types/redis_stream_base.h
@@ -182,6 +182,12 @@ enum class StreamSubkeyType {
StreamPelEntry = 3,
};
+struct StreamPelEntry {
+ uint64_t last_delivery_time;
+ uint64_t last_delivery_count;
+ std::string consumer_name;
+};
+
struct StreamInfo {
uint64_t size;
uint64_t entries_added;
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index 7bbce02b..d74ff2b0 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -1000,6 +1000,144 @@ func TestStreamOffset(t *testing.T) {
Streams: []string{streamName, "0"},
}).Err())
})
+
+ t.Run("XREADGROUP with different kinds of commands", func(t *testing.T)
{
+ streamName := "mystream"
+ groupName := "mygroup"
+ require.NoError(t, rdb.Del(ctx, streamName).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "1-0",
+ Values: []string{"field1", "data1"},
+ }).Err())
+ require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+ consumerName := "myconsumer"
+ r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName,
+ Streams: []string{streamName, ">"},
+ Count: 1,
+ NoAck: false,
+ }).Result()
+ require.NoError(t, err)
+ require.Equal(t, []redis.XStream{{
+ Stream: streamName,
+ Messages: []redis.XMessage{{ID: "1-0", Values:
map[string]interface{}{"field1": "data1"}}},
+ }}, r)
+
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "2-0",
+ Values: []string{"field2", "data2"},
+ }).Err())
+ r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName,
+ Streams: []string{streamName, ">"},
+ Count: 1,
+ NoAck: false,
+ }).Result()
+ require.NoError(t, err)
+ require.Equal(t, []redis.XStream{{
+ Stream: streamName,
+ Messages: []redis.XMessage{{ID: "2-0", Values:
map[string]interface{}{"field2": "data2"}}},
+ }}, r)
+
+ r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName,
+ Streams: []string{streamName, "0"},
+ Count: 2,
+ NoAck: false,
+ }).Result()
+ require.NoError(t, err)
+ require.Equal(t, []redis.XStream{{
+ Stream: streamName,
+ Messages: []redis.XMessage{{ID: "1-0", Values:
map[string]interface{}{"field1": "data1"}},
+ {ID: "2-0", Values:
map[string]interface{}{"field2": "data2"}}},
+ }}, r)
+
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "3-0",
+ Values: []string{"field3", "data3"},
+ }).Err())
+ r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName,
+ Streams: []string{streamName, ">"},
+ Count: 1,
+ NoAck: true,
+ }).Result()
+ require.NoError(t, err)
+ require.Equal(t, []redis.XStream{{
+ Stream: streamName,
+ Messages: []redis.XMessage{{ID: "3-0", Values:
map[string]interface{}{"field3": "data3"}}},
+ }}, r)
+ r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName,
+ Streams: []string{streamName, "0"},
+ Count: 2,
+ NoAck: false,
+ }).Result()
+ require.NoError(t, err)
+ require.Equal(t, []redis.XStream{{
+ Stream: streamName,
+ Messages: []redis.XMessage{{ID: "1-0", Values:
map[string]interface{}{"field1": "data1"}},
+ {ID: "2-0", Values:
map[string]interface{}{"field2": "data2"}}},
+ }}, r)
+
+ c := srv.NewClient()
+ defer func() { require.NoError(t, c.Close()) }()
+ ch := make(chan []redis.XStream)
+ go func() {
+ ch <- c.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName,
+ Streams: []string{streamName, ">"},
+ Count: 2,
+ Block: 10 * time.Second,
+ NoAck: false,
+ }).Val()
+ }()
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "4-0",
+ Values: []string{"field4", "data4"},
+ }).Err())
+ r = <-ch
+ require.Equal(t, []redis.XStream{{
+ Stream: streamName,
+ Messages: []redis.XMessage{{ID: "4-0", Values:
map[string]interface{}{"field4": "data4"}}},
+ }}, r)
+
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "5-0",
+ Values: []string{"field5", "data5"},
+ }).Err())
+ require.NoError(t, rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName,
+ Streams: []string{streamName, ">"},
+ Count: 1,
+ NoAck: false,
+ }).Err())
+ require.NoError(t, rdb.XDel(ctx, streamName, "5-0").Err())
+ r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName,
+ Streams: []string{streamName, "5"},
+ Count: 1,
+ NoAck: false,
+ }).Result()
+ require.NoError(t, err)
+ require.Equal(t, []redis.XStream{{
+ Stream: streamName,
+ Messages: []redis.XMessage{{ID: "5-0", Values:
map[string]interface{}(nil)}},
+ }}, r)
+ })
}
func parseStreamEntryID(id string) (ts int64, seqNum int64) {