This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new be983181 Add support of the XINFO GROUPS and CONSUMERS (#1870)
be983181 is described below
commit be983181ae3f751486706409e662cbc9322682c8
Author: Hauru <[email protected]>
AuthorDate: Sun Nov 12 16:47:59 2023 +0800
Add support of the XINFO GROUPS and CONSUMERS (#1870)
Co-authored-by: hulk <[email protected]>
---
src/commands/cmd_stream.cc | 92 ++++++++++++++-
src/types/redis_stream.cc | 166 ++++++++++++++++++++++++++-
src/types/redis_stream.h | 7 ++
src/types/redis_stream_base.h | 9 +-
tests/gocase/unit/type/stream/stream_test.go | 46 ++++++++
5 files changed, 312 insertions(+), 8 deletions(-)
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 45ed098a..0f8d6b96 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -26,6 +26,7 @@
#include "error_constants.h"
#include "event_util.h"
#include "server/server.h"
+#include "time_util.h"
#include "types/redis_stream.h"
namespace redis {
@@ -386,7 +387,7 @@ class CommandXInfo : public Commander {
Status Parse(const std::vector<std::string> &args) override {
auto val = util::ToLower(args[1]);
if (val == "stream" && args.size() >= 2) {
- stream_ = true;
+ subcommand_ = "stream";
if (args.size() > 3 && util::ToLower(args[3]) == "full") {
full_ = true;
@@ -400,20 +401,35 @@ class CommandXInfo : public Commander {
count_ = *parse_result;
}
+ } else if (val == "groups" && args.size() == 3) {
+ subcommand_ = "groups";
+ } else if (val == "consumers" && args.size() == 4) {
+ subcommand_ = "consumers";
+ } else {
+ return {Status::RedisParseErr, errUnknownSubcommandOrWrongArguments};
}
+
return Status::OK();
}
Status Execute(Server *srv, Connection *conn, std::string *output) override {
- if (stream_) {
+ if (subcommand_ == "stream") {
return getStreamInfo(srv, conn, output);
}
+
+ if (subcommand_ == "groups") {
+ return getGroupInfo(srv, conn, output);
+ }
+
+ if (subcommand_ == "consumers") {
+ return getConsumerInfo(srv, conn, output);
+ }
return Status::OK();
}
private:
uint64_t count_ = 10; // default Redis value
- bool stream_ = false;
+ std::string subcommand_;
bool full_ = false;
Status getStreamInfo(Server *srv, Connection *conn, std::string *output) {
@@ -472,6 +488,76 @@ class CommandXInfo : public Commander {
return Status::OK();
}
+
+ Status getGroupInfo(Server *srv, Connection *conn, std::string *output) {
+ redis::Stream stream_db(srv->storage, conn->GetNamespace());
+ std::vector<std::pair<std::string, StreamConsumerGroupMetadata>>
result_vector;
+ auto s = stream_db.GetGroupInfo(args_[2], result_vector);
+ if (!s.ok() && !s.IsNotFound()) {
+ return {Status::RedisExecErr, s.ToString()};
+ }
+
+ if (s.IsNotFound()) {
+ return {Status::RedisExecErr, errNoSuchKey};
+ }
+
+ output->append(redis::MultiLen(result_vector.size()));
+ for (auto const &it : result_vector) {
+ output->append(redis::MultiLen(12));
+ output->append(redis::BulkString("name"));
+ output->append(redis::BulkString(it.first));
+ output->append(redis::BulkString("consumers"));
+ output->append(redis::Integer(it.second.consumer_number));
+ output->append(redis::BulkString("pending"));
+ output->append(redis::Integer(it.second.pending_number));
+ output->append(redis::BulkString("last-delivered-id"));
+
output->append(redis::BulkString(it.second.last_delivered_id.ToString()));
+ output->append(redis::BulkString("entries-read"));
+ if (it.second.entries_read == -1) {
+ output->append(redis::NilString());
+ } else {
+ output->append(redis::Integer(it.second.entries_read));
+ }
+ output->append(redis::BulkString("lag"));
+ if (it.second.lag == UINT64_MAX) {
+ output->append(redis::NilString());
+ } else {
+ output->append(redis::Integer(it.second.lag));
+ }
+ }
+
+ return Status::OK();
+ }
+
+ Status getConsumerInfo(Server *srv, Connection *conn, std::string *output) {
+ redis::Stream stream_db(srv->storage, conn->GetNamespace());
+ std::vector<std::pair<std::string, redis::StreamConsumerMetadata>>
result_vector;
+ auto s = stream_db.GetConsumerInfo(args_[2], args_[3], result_vector);
+
+ if (!s.ok() && !s.IsNotFound()) {
+ return {Status::RedisExecErr, s.ToString()};
+ }
+
+ if (s.IsNotFound()) {
+ return {Status::RedisExecErr, errNoSuchKey};
+ }
+
+ output->append(redis::MultiLen(result_vector.size()));
+ auto now = util::GetTimeStampMS();
+ for (auto const &it : result_vector) {
+ output->append(redis::MultiLen(8));
+ output->append(redis::BulkString("name"));
+ output->append(redis::BulkString(it.first));
+ output->append(redis::BulkString("pending"));
+ output->append(redis::Integer(it.second.pending_number));
+ output->append(redis::BulkString("idle"));
+ output->append(redis::Integer(now - it.second.last_idle));
+ output->append(redis::BulkString("inactive"));
+ output->append(redis::Integer(now - it.second.last_active));
+ }
+
+ return Status::OK();
+ }
};
class CommandXRange : public Commander {
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index ddf63c91..bc050ad0 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -31,7 +31,7 @@
namespace redis {
-constexpr const char *consumerGroupMetadataDelimiter = "METADATA";
+std::string_view consumerGroupMetadataDelimiter = "METADATA";
const char *errSetEntryIdSmallerThanLastGenerated =
"The ID specified in XSETID is smaller than the target stream top item";
const char *errEntriesAddedSmallerThanStreamSize =
@@ -181,9 +181,7 @@ std::string Stream::groupNameFromInternalKey(rocksdb::Slice
key) const {
uint64_t len = 0;
GetFixed64(&group_name_metadata, &len);
std::string group_name;
- if (len <= group_name_metadata.size() -
strlen(consumerGroupMetadataDelimiter)) {
- group_name = group_name_metadata.ToString().substr(0, len);
- }
+ group_name = group_name_metadata.ToString().substr(0, len);
return group_name;
}
@@ -224,6 +222,17 @@ std::string Stream::internalKeyFromConsumerName(const
std::string &ns_key, const
return entry_key;
}
+std::string Stream::consumerNameFromInternalKey(rocksdb::Slice key) const {
+ InternalKey ikey(key, storage_->IsSlotIdEncoded());
+ Slice subkey = ikey.GetSubKey();
+ uint64_t group_name_len = 0;
+ GetFixed64(&subkey, &group_name_len);
+ subkey.remove_prefix(group_name_len);
+ uint64_t consumer_name_len = 0;
+ GetFixed64(&subkey, &consumer_name_len);
+ return subkey.ToString().substr(0, consumer_name_len);
+}
+
std::string Stream::encodeStreamConsumerMetadataValue(const
StreamConsumerMetadata &consumer_metadata) {
std::string dst;
PutFixed64(&dst, consumer_metadata.pending_number);
@@ -232,6 +241,35 @@ std::string
Stream::encodeStreamConsumerMetadataValue(const StreamConsumerMetada
return dst;
}
+StreamConsumerMetadata Stream::decodeStreamConsumerMetadataValue(const
std::string &value) {
+ StreamConsumerMetadata consumer_metadata;
+ rocksdb::Slice input(value);
+ GetFixed64(&input, &consumer_metadata.pending_number);
+ GetFixed64(&input, &consumer_metadata.last_idle);
+ GetFixed64(&input, &consumer_metadata.last_active);
+ return consumer_metadata;
+}
+
+StreamSubkeyType Stream::identifySubkeyType(const rocksdb::Slice &key) {
+ InternalKey ikey(key, storage_->IsSlotIdEncoded());
+ Slice subkey = ikey.GetSubKey();
+ const size_t entry_id_size = sizeof(StreamEntryID);
+ if (subkey.size() <= entry_id_size) {
+ return StreamSubkeyType::StreamEntry;
+ }
+ uint64_t group_name_len = 0;
+ GetFixed64(&subkey, &group_name_len);
+ std::string without_group_name = subkey.ToString().substr(group_name_len);
+ const size_t metadata_delimiter_size = consumerGroupMetadataDelimiter.size();
+ if (without_group_name.size() <= metadata_delimiter_size) {
+ return StreamSubkeyType::StreamConsumerGroupMetadata;
+ }
+ if (without_group_name.size() <= entry_id_size) {
+ return StreamSubkeyType::StreamPelEntry;
+ }
+ return StreamSubkeyType::StreamConsumerMetadata;
+}
+
rocksdb::Status Stream::CreateGroup(const Slice &stream_name, const
StreamXGroupCreateOptions &options,
const std::string &group_name) {
if (std::isdigit(group_name[0])) {
@@ -737,6 +775,126 @@ rocksdb::Status Stream::GetStreamInfo(const
rocksdb::Slice &stream_name, bool fu
return rocksdb::Status::OK();
}
+static bool StreamRangeHasTombstones(const StreamMetadata &metadata,
StreamEntryID start_id) {
+ StreamEntryID end_id = StreamEntryID{UINT64_MAX, UINT64_MAX};
+ if (metadata.size == 0 || metadata.max_deleted_entry_id == StreamEntryID{0,
0}) {
+ return false;
+ }
+ if (metadata.first_entry_id > metadata.max_deleted_entry_id) {
+ return false;
+ }
+ return (start_id <= metadata.max_deleted_entry_id &&
metadata.max_deleted_entry_id <= end_id);
+}
+
+static int64_t StreamEstimateDistanceFromFirstEverEntry(const StreamMetadata
&metadata, StreamEntryID id) {
+ if (metadata.entries_added == 0) {
+ return 0;
+ }
+ if (metadata.size == 0 && id < metadata.last_entry_id) {
+ return static_cast<int64_t>(metadata.entries_added);
+ }
+ if (id == metadata.last_entry_id) {
+ return static_cast<int64_t>(metadata.entries_added);
+ } else if (id > metadata.last_entry_id) {
+ return -1;
+ }
+ if (metadata.max_deleted_entry_id == StreamEntryID{0, 0} ||
metadata.max_deleted_entry_id < metadata.first_entry_id) {
+ if (id < metadata.first_entry_id) {
+ return static_cast<int64_t>(metadata.entries_added - metadata.size);
+ } else if (id == metadata.first_entry_id) {
+ return static_cast<int64_t>(metadata.entries_added - metadata.size + 1);
+ }
+ }
+ return -1;
+}
+
+static void CheckLagValid(const StreamMetadata &stream_metadata,
StreamConsumerGroupMetadata &group_metadata) {
+ bool valid = false;
+ if (stream_metadata.entries_added == 0) {
+ group_metadata.lag = 0;
+ valid = true;
+ } else if (group_metadata.entries_read != -1 &&
+ !StreamRangeHasTombstones(stream_metadata,
group_metadata.last_delivered_id)) {
+ group_metadata.lag = stream_metadata.entries_added -
group_metadata.entries_read;
+ valid = true;
+ } else {
+ int64_t entries_read =
StreamEstimateDistanceFromFirstEverEntry(stream_metadata,
group_metadata.last_delivered_id);
+ if (entries_read != -1) {
+ group_metadata.lag = stream_metadata.entries_added - entries_read;
+ valid = true;
+ }
+ }
+ if (!valid) {
+ group_metadata.lag = UINT64_MAX;
+ }
+}
+
+rocksdb::Status Stream::GetGroupInfo(const Slice &stream_name,
+ std::vector<std::pair<std::string,
StreamConsumerGroupMetadata>> &group_metadata) {
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+ StreamMetadata metadata(false);
+ rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ if (!s.ok()) return s;
+
+ std::string next_version_prefix_key =
+ InternalKey(ns_key, "", metadata.version + 1,
storage_->IsSlotIdEncoded()).Encode();
+ std::string prefix_key = InternalKey(ns_key, "", metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+
+ rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
+ LatestSnapShot ss(storage_);
+ read_options.snapshot = ss.GetSnapShot();
+ rocksdb::Slice upper_bound(next_version_prefix_key);
+ read_options.iterate_upper_bound = &upper_bound;
+ rocksdb::Slice lower_bound(prefix_key);
+ read_options.iterate_lower_bound = &lower_bound;
+
+ auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ if (identifySubkeyType(iter->key()) ==
StreamSubkeyType::StreamConsumerGroupMetadata) {
+ std::string group_name = groupNameFromInternalKey(iter->key());
+ StreamConsumerGroupMetadata cg_metadata =
decodeStreamConsumerGroupMetadataValue(iter->value().ToString());
+ CheckLagValid(metadata, cg_metadata);
+ std::pair<std::string, StreamConsumerGroupMetadata> tmp_item(group_name,
cg_metadata);
+ group_metadata.push_back(tmp_item);
+ }
+ }
+ return rocksdb::Status::OK();
+}
+
+rocksdb::Status Stream::GetConsumerInfo(
+ const Slice &stream_name, const std::string &group_name,
+ std::vector<std::pair<std::string, StreamConsumerMetadata>>
&consumer_metadata) {
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+ StreamMetadata metadata(false);
+ rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ if (!s.ok()) return s;
+
+ std::string next_version_prefix_key =
+ InternalKey(ns_key, "", metadata.version + 1,
storage_->IsSlotIdEncoded()).Encode();
+ std::string prefix_key = InternalKey(ns_key, "", metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+
+ rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
+ LatestSnapShot ss(storage_);
+ read_options.snapshot = ss.GetSnapShot();
+ rocksdb::Slice upper_bound(next_version_prefix_key);
+ read_options.iterate_upper_bound = &upper_bound;
+ rocksdb::Slice lower_bound(prefix_key);
+ read_options.iterate_lower_bound = &lower_bound;
+
+ auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ if (identifySubkeyType(iter->key()) ==
StreamSubkeyType::StreamConsumerMetadata) {
+ std::string cur_group_name = groupNameFromInternalKey(iter->key());
+ if (cur_group_name != group_name) continue;
+ std::string consumer_name = consumerNameFromInternalKey(iter->key());
+ StreamConsumerMetadata c_metadata =
decodeStreamConsumerMetadataValue(iter->value().ToString());
+ std::pair<std::string, StreamConsumerMetadata> tmp_item(consumer_name,
c_metadata);
+ consumer_metadata.push_back(tmp_item);
+ }
+ }
+ return rocksdb::Status::OK();
+}
+
rocksdb::Status Stream::Range(const Slice &stream_name, const
StreamRangeOptions &options,
std::vector<StreamEntry> *entries) {
entries->clear();
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index 76bc17a6..8ae5a14d 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -49,6 +49,10 @@ class Stream : public SubKeyScanner {
rocksdb::Status DeleteEntries(const Slice &stream_name, const
std::vector<StreamEntryID> &ids, uint64_t *deleted_cnt);
rocksdb::Status Len(const Slice &stream_name, const StreamLenOptions
&options, uint64_t *size);
rocksdb::Status GetStreamInfo(const Slice &stream_name, bool full, uint64_t
count, StreamInfo *info);
+ rocksdb::Status GetGroupInfo(const Slice &stream_name,
+ std::vector<std::pair<std::string,
StreamConsumerGroupMetadata>> &group_metadata);
+ rocksdb::Status GetConsumerInfo(const Slice &stream_name, const std::string
&group_name,
+ std::vector<std::pair<std::string,
StreamConsumerMetadata>> &consumer_metadata);
rocksdb::Status Range(const Slice &stream_name, const StreamRangeOptions
&options, std::vector<StreamEntry> *entries);
rocksdb::Status Trim(const Slice &stream_name, const StreamTrimOptions
&options, uint64_t *delete_cnt);
rocksdb::Status GetMetadata(const Slice &stream_name, StreamMetadata
*metadata);
@@ -75,7 +79,10 @@ class Stream : public SubKeyScanner {
static StreamConsumerGroupMetadata
decodeStreamConsumerGroupMetadataValue(const std::string &value);
std::string internalKeyFromConsumerName(const std::string &ns_key, const
StreamMetadata &metadata,
const std::string &group_name, const
std::string &consumer_name) const;
+ std::string consumerNameFromInternalKey(rocksdb::Slice key) const;
static std::string encodeStreamConsumerMetadataValue(const
StreamConsumerMetadata &consumer_metadata);
+ static StreamConsumerMetadata decodeStreamConsumerMetadataValue(const
std::string &value);
+ StreamSubkeyType identifySubkeyType(const rocksdb::Slice &key);
};
} // namespace redis
diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h
index 9899f0e8..7ff62e6e 100644
--- a/src/types/redis_stream_base.h
+++ b/src/types/redis_stream_base.h
@@ -157,7 +157,7 @@ struct StreamLenOptions {
struct StreamXGroupCreateOptions {
bool mkstream = false;
- int64_t entries_read = 0;
+ int64_t entries_read = -1;
std::string last_id;
};
@@ -175,6 +175,13 @@ struct StreamConsumerMetadata {
uint64_t last_active;
};
+enum class StreamSubkeyType {
+ StreamEntry = 0,
+ StreamConsumerGroupMetadata = 1,
+ StreamConsumerMetadata = 2,
+ StreamPelEntry = 3,
+};
+
struct StreamInfo {
uint64_t size;
uint64_t entries_added;
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index 05822387..d3a1f8d2 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -929,6 +929,52 @@ func TestStreamOffset(t *testing.T) {
require.Error(t, rdb.Do(ctx, "xgroup", "setid", streamName,
groupName, "$", "entriesread", "-100").Err())
require.NoError(t, rdb.Do(ctx, "xgroup", "setid", streamName,
groupName, "$", "entriesread", "100").Err())
})
+
+ t.Run("XINFO GROUPS and XINFO CONSUMERS", func(t *testing.T) {
+ streamName := "test-stream"
+ group1 := "t1"
+ group2 := "t2"
+ consumer1 := "c1"
+ consumer2 := "c2"
+ consumer3 := "c3"
+ require.NoError(t, rdb.Del(ctx, streamName).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "1-0",
+ Values: []string{"data", "a"},
+ }).Err())
+ require.NoError(t, rdb.XGroupCreate(ctx, streamName, group1,
"$").Err())
+ r := rdb.XInfoGroups(ctx, streamName).Val()
+ require.Equal(t, group1, r[0].Name)
+ require.Equal(t, int64(0), r[0].Consumers)
+ require.Equal(t, int64(0), r[0].Pending)
+ require.Equal(t, "1-0", r[0].LastDeliveredID)
+ require.Equal(t, int64(0), r[0].EntriesRead)
+ require.Equal(t, int64(0), r[0].Lag)
+
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "2-0",
+ Values: []string{"data1", "b"},
+ }).Err())
+ require.NoError(t, rdb.XGroupCreate(ctx, streamName, group2,
"$").Err())
+ r = rdb.XInfoGroups(ctx, streamName).Val()
+ require.Equal(t, group2, r[1].Name)
+ require.Equal(t, "2-0", r[1].LastDeliveredID)
+
+ require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName,
group1, consumer1).Err())
+ require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName,
group1, consumer2).Err())
+ require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName,
group2, consumer3).Err())
+ r = rdb.XInfoGroups(ctx, streamName).Val()
+ require.Equal(t, int64(2), r[0].Consumers)
+ require.Equal(t, int64(1), r[1].Consumers)
+
+ r1 := rdb.XInfoConsumers(ctx, streamName, group1).Val()
+ require.Equal(t, consumer1, r1[0].Name)
+ require.Equal(t, consumer2, r1[1].Name)
+ r1 = rdb.XInfoConsumers(ctx, streamName, group2).Val()
+ require.Equal(t, consumer3, r1[0].Name)
+ })
}
func parseStreamEntryID(id string) (ts int64, seqNum int64) {