This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new be983181 Add support of the XINFO GROUPS and CONSUMERS (#1870)
be983181 is described below

commit be983181ae3f751486706409e662cbc9322682c8
Author: Hauru <[email protected]>
AuthorDate: Sun Nov 12 16:47:59 2023 +0800

    Add support of the XINFO GROUPS and CONSUMERS (#1870)
    
    Co-authored-by: hulk <[email protected]>
---
 src/commands/cmd_stream.cc                   |  92 ++++++++++++++-
 src/types/redis_stream.cc                    | 166 ++++++++++++++++++++++++++-
 src/types/redis_stream.h                     |   7 ++
 src/types/redis_stream_base.h                |   9 +-
 tests/gocase/unit/type/stream/stream_test.go |  46 ++++++++
 5 files changed, 312 insertions(+), 8 deletions(-)

diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 45ed098a..0f8d6b96 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -26,6 +26,7 @@
 #include "error_constants.h"
 #include "event_util.h"
 #include "server/server.h"
+#include "time_util.h"
 #include "types/redis_stream.h"
 
 namespace redis {
@@ -386,7 +387,7 @@ class CommandXInfo : public Commander {
   Status Parse(const std::vector<std::string> &args) override {
     auto val = util::ToLower(args[1]);
     if (val == "stream" && args.size() >= 2) {
-      stream_ = true;
+      subcommand_ = "stream";
 
       if (args.size() > 3 && util::ToLower(args[3]) == "full") {
         full_ = true;
@@ -400,20 +401,35 @@ class CommandXInfo : public Commander {
 
         count_ = *parse_result;
       }
+    } else if (val == "groups" && args.size() == 3) {
+      subcommand_ = "groups";
+    } else if (val == "consumers" && args.size() == 4) {
+      subcommand_ = "consumers";
+    } else {
+      return {Status::RedisParseErr, errUnknownSubcommandOrWrongArguments};
     }
+
     return Status::OK();
   }
 
   Status Execute(Server *srv, Connection *conn, std::string *output) override {
-    if (stream_) {
+    if (subcommand_ == "stream") {
       return getStreamInfo(srv, conn, output);
     }
+
+    if (subcommand_ == "groups") {
+      return getGroupInfo(srv, conn, output);
+    }
+
+    if (subcommand_ == "consumers") {
+      return getConsumerInfo(srv, conn, output);
+    }
     return Status::OK();
   }
 
  private:
   uint64_t count_ = 10;  // default Redis value
-  bool stream_ = false;
+  std::string subcommand_;
   bool full_ = false;
 
   Status getStreamInfo(Server *srv, Connection *conn, std::string *output) {
@@ -472,6 +488,76 @@ class CommandXInfo : public Commander {
 
     return Status::OK();
   }
+
+  Status getGroupInfo(Server *srv, Connection *conn, std::string *output) {
+    redis::Stream stream_db(srv->storage, conn->GetNamespace());
+    std::vector<std::pair<std::string, StreamConsumerGroupMetadata>> 
result_vector;
+    auto s = stream_db.GetGroupInfo(args_[2], result_vector);
+    if (!s.ok() && !s.IsNotFound()) {
+      return {Status::RedisExecErr, s.ToString()};
+    }
+
+    if (s.IsNotFound()) {
+      return {Status::RedisExecErr, errNoSuchKey};
+    }
+
+    output->append(redis::MultiLen(result_vector.size()));
+    for (auto const &it : result_vector) {
+      output->append(redis::MultiLen(12));
+      output->append(redis::BulkString("name"));
+      output->append(redis::BulkString(it.first));
+      output->append(redis::BulkString("consumers"));
+      output->append(redis::Integer(it.second.consumer_number));
+      output->append(redis::BulkString("pending"));
+      output->append(redis::Integer(it.second.pending_number));
+      output->append(redis::BulkString("last-delivered-id"));
+      
output->append(redis::BulkString(it.second.last_delivered_id.ToString()));
+      output->append(redis::BulkString("entries-read"));
+      if (it.second.entries_read == -1) {
+        output->append(redis::NilString());
+      } else {
+        output->append(redis::Integer(it.second.entries_read));
+      }
+      output->append(redis::BulkString("lag"));
+      if (it.second.lag == UINT64_MAX) {
+        output->append(redis::NilString());
+      } else {
+        output->append(redis::Integer(it.second.lag));
+      }
+    }
+
+    return Status::OK();
+  }
+
+  Status getConsumerInfo(Server *srv, Connection *conn, std::string *output) {
+    redis::Stream stream_db(srv->storage, conn->GetNamespace());
+    std::vector<std::pair<std::string, redis::StreamConsumerMetadata>> 
result_vector;
+    auto s = stream_db.GetConsumerInfo(args_[2], args_[3], result_vector);
+
+    if (!s.ok() && !s.IsNotFound()) {
+      return {Status::RedisExecErr, s.ToString()};
+    }
+
+    if (s.IsNotFound()) {
+      return {Status::RedisExecErr, errNoSuchKey};
+    }
+
+    output->append(redis::MultiLen(result_vector.size()));
+    auto now = util::GetTimeStampMS();
+    for (auto const &it : result_vector) {
+      output->append(redis::MultiLen(8));
+      output->append(redis::BulkString("name"));
+      output->append(redis::BulkString(it.first));
+      output->append(redis::BulkString("pending"));
+      output->append(redis::Integer(it.second.pending_number));
+      output->append(redis::BulkString("idle"));
+      output->append(redis::Integer(now - it.second.last_idle));
+      output->append(redis::BulkString("inactive"));
+      output->append(redis::Integer(now - it.second.last_active));
+    }
+
+    return Status::OK();
+  }
 };
 
 class CommandXRange : public Commander {
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index ddf63c91..bc050ad0 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -31,7 +31,7 @@
 
 namespace redis {
 
-constexpr const char *consumerGroupMetadataDelimiter = "METADATA";
+std::string_view consumerGroupMetadataDelimiter = "METADATA";
 const char *errSetEntryIdSmallerThanLastGenerated =
     "The ID specified in XSETID is smaller than the target stream top item";
 const char *errEntriesAddedSmallerThanStreamSize =
@@ -181,9 +181,7 @@ std::string Stream::groupNameFromInternalKey(rocksdb::Slice 
key) const {
   uint64_t len = 0;
   GetFixed64(&group_name_metadata, &len);
   std::string group_name;
-  if (len <= group_name_metadata.size() - 
strlen(consumerGroupMetadataDelimiter)) {
-    group_name = group_name_metadata.ToString().substr(0, len);
-  }
+  group_name = group_name_metadata.ToString().substr(0, len);
   return group_name;
 }
 
@@ -224,6 +222,17 @@ std::string Stream::internalKeyFromConsumerName(const 
std::string &ns_key, const
   return entry_key;
 }
 
+std::string Stream::consumerNameFromInternalKey(rocksdb::Slice key) const {
+  InternalKey ikey(key, storage_->IsSlotIdEncoded());
+  Slice subkey = ikey.GetSubKey();
+  uint64_t group_name_len = 0;
+  GetFixed64(&subkey, &group_name_len);
+  subkey.remove_prefix(group_name_len);
+  uint64_t consumer_name_len = 0;
+  GetFixed64(&subkey, &consumer_name_len);
+  return subkey.ToString().substr(0, consumer_name_len);
+}
+
 std::string Stream::encodeStreamConsumerMetadataValue(const 
StreamConsumerMetadata &consumer_metadata) {
   std::string dst;
   PutFixed64(&dst, consumer_metadata.pending_number);
@@ -232,6 +241,35 @@ std::string 
Stream::encodeStreamConsumerMetadataValue(const StreamConsumerMetada
   return dst;
 }
 
+StreamConsumerMetadata Stream::decodeStreamConsumerMetadataValue(const 
std::string &value) {
+  StreamConsumerMetadata consumer_metadata;
+  rocksdb::Slice input(value);
+  GetFixed64(&input, &consumer_metadata.pending_number);
+  GetFixed64(&input, &consumer_metadata.last_idle);
+  GetFixed64(&input, &consumer_metadata.last_active);
+  return consumer_metadata;
+}
+
+StreamSubkeyType Stream::identifySubkeyType(const rocksdb::Slice &key) {
+  InternalKey ikey(key, storage_->IsSlotIdEncoded());
+  Slice subkey = ikey.GetSubKey();
+  const size_t entry_id_size = sizeof(StreamEntryID);
+  if (subkey.size() <= entry_id_size) {
+    return StreamSubkeyType::StreamEntry;
+  }
+  uint64_t group_name_len = 0;
+  GetFixed64(&subkey, &group_name_len);
+  std::string without_group_name = subkey.ToString().substr(group_name_len);
+  const size_t metadata_delimiter_size = consumerGroupMetadataDelimiter.size();
+  if (without_group_name.size() <= metadata_delimiter_size) {
+    return StreamSubkeyType::StreamConsumerGroupMetadata;
+  }
+  if (without_group_name.size() <= entry_id_size) {
+    return StreamSubkeyType::StreamPelEntry;
+  }
+  return StreamSubkeyType::StreamConsumerMetadata;
+}
+
 rocksdb::Status Stream::CreateGroup(const Slice &stream_name, const 
StreamXGroupCreateOptions &options,
                                     const std::string &group_name) {
   if (std::isdigit(group_name[0])) {
@@ -737,6 +775,126 @@ rocksdb::Status Stream::GetStreamInfo(const 
rocksdb::Slice &stream_name, bool fu
   return rocksdb::Status::OK();
 }
 
+static bool StreamRangeHasTombstones(const StreamMetadata &metadata, 
StreamEntryID start_id) {
+  StreamEntryID end_id = StreamEntryID{UINT64_MAX, UINT64_MAX};
+  if (metadata.size == 0 || metadata.max_deleted_entry_id == StreamEntryID{0, 
0}) {
+    return false;
+  }
+  if (metadata.first_entry_id > metadata.max_deleted_entry_id) {
+    return false;
+  }
+  return (start_id <= metadata.max_deleted_entry_id && 
metadata.max_deleted_entry_id <= end_id);
+}
+
+static int64_t StreamEstimateDistanceFromFirstEverEntry(const StreamMetadata 
&metadata, StreamEntryID id) {
+  if (metadata.entries_added == 0) {
+    return 0;
+  }
+  if (metadata.size == 0 && id < metadata.last_entry_id) {
+    return static_cast<int64_t>(metadata.entries_added);
+  }
+  if (id == metadata.last_entry_id) {
+    return static_cast<int64_t>(metadata.entries_added);
+  } else if (id > metadata.last_entry_id) {
+    return -1;
+  }
+  if (metadata.max_deleted_entry_id == StreamEntryID{0, 0} || 
metadata.max_deleted_entry_id < metadata.first_entry_id) {
+    if (id < metadata.first_entry_id) {
+      return static_cast<int64_t>(metadata.entries_added - metadata.size);
+    } else if (id == metadata.first_entry_id) {
+      return static_cast<int64_t>(metadata.entries_added - metadata.size + 1);
+    }
+  }
+  return -1;
+}
+
+static void CheckLagValid(const StreamMetadata &stream_metadata, 
StreamConsumerGroupMetadata &group_metadata) {
+  bool valid = false;
+  if (stream_metadata.entries_added == 0) {
+    group_metadata.lag = 0;
+    valid = true;
+  } else if (group_metadata.entries_read != -1 &&
+             !StreamRangeHasTombstones(stream_metadata, 
group_metadata.last_delivered_id)) {
+    group_metadata.lag = stream_metadata.entries_added - 
group_metadata.entries_read;
+    valid = true;
+  } else {
+    int64_t entries_read = 
StreamEstimateDistanceFromFirstEverEntry(stream_metadata, 
group_metadata.last_delivered_id);
+    if (entries_read != -1) {
+      group_metadata.lag = stream_metadata.entries_added - entries_read;
+      valid = true;
+    }
+  }
+  if (!valid) {
+    group_metadata.lag = UINT64_MAX;
+  }
+}
+
+rocksdb::Status Stream::GetGroupInfo(const Slice &stream_name,
+                                     std::vector<std::pair<std::string, 
StreamConsumerGroupMetadata>> &group_metadata) {
+  std::string ns_key = AppendNamespacePrefix(stream_name);
+  StreamMetadata metadata(false);
+  rocksdb::Status s = GetMetadata(ns_key, &metadata);
+  if (!s.ok()) return s;
+
+  std::string next_version_prefix_key =
+      InternalKey(ns_key, "", metadata.version + 1, 
storage_->IsSlotIdEncoded()).Encode();
+  std::string prefix_key = InternalKey(ns_key, "", metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+
+  rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
+  LatestSnapShot ss(storage_);
+  read_options.snapshot = ss.GetSnapShot();
+  rocksdb::Slice upper_bound(next_version_prefix_key);
+  read_options.iterate_upper_bound = &upper_bound;
+  rocksdb::Slice lower_bound(prefix_key);
+  read_options.iterate_lower_bound = &lower_bound;
+
+  auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
+  for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+    if (identifySubkeyType(iter->key()) == 
StreamSubkeyType::StreamConsumerGroupMetadata) {
+      std::string group_name = groupNameFromInternalKey(iter->key());
+      StreamConsumerGroupMetadata cg_metadata = 
decodeStreamConsumerGroupMetadataValue(iter->value().ToString());
+      CheckLagValid(metadata, cg_metadata);
+      std::pair<std::string, StreamConsumerGroupMetadata> tmp_item(group_name, 
cg_metadata);
+      group_metadata.push_back(tmp_item);
+    }
+  }
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status Stream::GetConsumerInfo(
+    const Slice &stream_name, const std::string &group_name,
+    std::vector<std::pair<std::string, StreamConsumerMetadata>> 
&consumer_metadata) {
+  std::string ns_key = AppendNamespacePrefix(stream_name);
+  StreamMetadata metadata(false);
+  rocksdb::Status s = GetMetadata(ns_key, &metadata);
+  if (!s.ok()) return s;
+
+  std::string next_version_prefix_key =
+      InternalKey(ns_key, "", metadata.version + 1, 
storage_->IsSlotIdEncoded()).Encode();
+  std::string prefix_key = InternalKey(ns_key, "", metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+
+  rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
+  LatestSnapShot ss(storage_);
+  read_options.snapshot = ss.GetSnapShot();
+  rocksdb::Slice upper_bound(next_version_prefix_key);
+  read_options.iterate_upper_bound = &upper_bound;
+  rocksdb::Slice lower_bound(prefix_key);
+  read_options.iterate_lower_bound = &lower_bound;
+
+  auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
+  for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+    if (identifySubkeyType(iter->key()) == 
StreamSubkeyType::StreamConsumerMetadata) {
+      std::string cur_group_name = groupNameFromInternalKey(iter->key());
+      if (cur_group_name != group_name) continue;
+      std::string consumer_name = consumerNameFromInternalKey(iter->key());
+      StreamConsumerMetadata c_metadata = 
decodeStreamConsumerMetadataValue(iter->value().ToString());
+      std::pair<std::string, StreamConsumerMetadata> tmp_item(consumer_name, 
c_metadata);
+      consumer_metadata.push_back(tmp_item);
+    }
+  }
+  return rocksdb::Status::OK();
+}
+
 rocksdb::Status Stream::Range(const Slice &stream_name, const 
StreamRangeOptions &options,
                               std::vector<StreamEntry> *entries) {
   entries->clear();
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index 76bc17a6..8ae5a14d 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -49,6 +49,10 @@ class Stream : public SubKeyScanner {
   rocksdb::Status DeleteEntries(const Slice &stream_name, const 
std::vector<StreamEntryID> &ids, uint64_t *deleted_cnt);
   rocksdb::Status Len(const Slice &stream_name, const StreamLenOptions 
&options, uint64_t *size);
   rocksdb::Status GetStreamInfo(const Slice &stream_name, bool full, uint64_t 
count, StreamInfo *info);
+  rocksdb::Status GetGroupInfo(const Slice &stream_name,
+                               std::vector<std::pair<std::string, 
StreamConsumerGroupMetadata>> &group_metadata);
+  rocksdb::Status GetConsumerInfo(const Slice &stream_name, const std::string 
&group_name,
+                                  std::vector<std::pair<std::string, 
StreamConsumerMetadata>> &consumer_metadata);
   rocksdb::Status Range(const Slice &stream_name, const StreamRangeOptions 
&options, std::vector<StreamEntry> *entries);
   rocksdb::Status Trim(const Slice &stream_name, const StreamTrimOptions 
&options, uint64_t *delete_cnt);
   rocksdb::Status GetMetadata(const Slice &stream_name, StreamMetadata 
*metadata);
@@ -75,7 +79,10 @@ class Stream : public SubKeyScanner {
   static StreamConsumerGroupMetadata 
decodeStreamConsumerGroupMetadataValue(const std::string &value);
   std::string internalKeyFromConsumerName(const std::string &ns_key, const 
StreamMetadata &metadata,
                                           const std::string &group_name, const 
std::string &consumer_name) const;
+  std::string consumerNameFromInternalKey(rocksdb::Slice key) const;
   static std::string encodeStreamConsumerMetadataValue(const 
StreamConsumerMetadata &consumer_metadata);
+  static StreamConsumerMetadata decodeStreamConsumerMetadataValue(const 
std::string &value);
+  StreamSubkeyType identifySubkeyType(const rocksdb::Slice &key);
 };
 
 }  // namespace redis
diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h
index 9899f0e8..7ff62e6e 100644
--- a/src/types/redis_stream_base.h
+++ b/src/types/redis_stream_base.h
@@ -157,7 +157,7 @@ struct StreamLenOptions {
 
 struct StreamXGroupCreateOptions {
   bool mkstream = false;
-  int64_t entries_read = 0;
+  int64_t entries_read = -1;
   std::string last_id;
 };
 
@@ -175,6 +175,13 @@ struct StreamConsumerMetadata {
   uint64_t last_active;
 };
 
+enum class StreamSubkeyType {
+  StreamEntry = 0,
+  StreamConsumerGroupMetadata = 1,
+  StreamConsumerMetadata = 2,
+  StreamPelEntry = 3,
+};
+
 struct StreamInfo {
   uint64_t size;
   uint64_t entries_added;
diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index 05822387..d3a1f8d2 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -929,6 +929,52 @@ func TestStreamOffset(t *testing.T) {
                require.Error(t, rdb.Do(ctx, "xgroup", "setid", streamName, 
groupName, "$", "entriesread", "-100").Err())
                require.NoError(t, rdb.Do(ctx, "xgroup", "setid", streamName, 
groupName, "$", "entriesread", "100").Err())
        })
+
+       t.Run("XINFO GROUPS and XINFO CONSUMERS", func(t *testing.T) {
+               streamName := "test-stream"
+               group1 := "t1"
+               group2 := "t2"
+               consumer1 := "c1"
+               consumer2 := "c2"
+               consumer3 := "c3"
+               require.NoError(t, rdb.Del(ctx, streamName).Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "1-0",
+                       Values: []string{"data", "a"},
+               }).Err())
+               require.NoError(t, rdb.XGroupCreate(ctx, streamName, group1, 
"$").Err())
+               r := rdb.XInfoGroups(ctx, streamName).Val()
+               require.Equal(t, group1, r[0].Name)
+               require.Equal(t, int64(0), r[0].Consumers)
+               require.Equal(t, int64(0), r[0].Pending)
+               require.Equal(t, "1-0", r[0].LastDeliveredID)
+               require.Equal(t, int64(0), r[0].EntriesRead)
+               require.Equal(t, int64(0), r[0].Lag)
+
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "2-0",
+                       Values: []string{"data1", "b"},
+               }).Err())
+               require.NoError(t, rdb.XGroupCreate(ctx, streamName, group2, 
"$").Err())
+               r = rdb.XInfoGroups(ctx, streamName).Val()
+               require.Equal(t, group2, r[1].Name)
+               require.Equal(t, "2-0", r[1].LastDeliveredID)
+
+               require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, 
group1, consumer1).Err())
+               require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, 
group1, consumer2).Err())
+               require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, 
group2, consumer3).Err())
+               r = rdb.XInfoGroups(ctx, streamName).Val()
+               require.Equal(t, int64(2), r[0].Consumers)
+               require.Equal(t, int64(1), r[1].Consumers)
+
+               r1 := rdb.XInfoConsumers(ctx, streamName, group1).Val()
+               require.Equal(t, consumer1, r1[0].Name)
+               require.Equal(t, consumer2, r1[1].Name)
+               r1 = rdb.XInfoConsumers(ctx, streamName, group2).Val()
+               require.Equal(t, consumer3, r1[0].Name)
+       })
 }
 
 func parseStreamEntryID(id string) (ts int64, seqNum int64) {

Reply via email to