This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new e38021ca Add support of the XGROUP CREATECONSUMER command (#1799)
e38021ca is described below

commit e38021ca509d21624957f217aae516360ff77adb
Author: Hauru <[email protected]>
AuthorDate: Thu Oct 12 11:44:35 2023 +0800

    Add support of the XGROUP CREATECONSUMER command (#1799)
---
 src/commands/cmd_stream.cc                   | 20 ++++++++
 src/types/redis_stream.cc                    | 74 ++++++++++++++++++++++++++++
 src/types/redis_stream.h                     |  5 ++
 src/types/redis_stream_base.h                |  6 +++
 tests/gocase/unit/type/stream/stream_test.go | 22 +++++++++
 5 files changed, 127 insertions(+)

diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 8e19b9d3..a950a30d 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -249,6 +249,15 @@ class CommandXGroup : public Commander {
       return Status::OK();
     }
 
+    if (subcommand_ == "createconsumer") {
+      if (args.size() != 5) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+      consumer_name_ = GET_OR_RET(parser.TakeStr());
+
+      return Status::OK();
+    }
+
     return {Status::RedisParseErr, "unknown subcommand"};
   }
 
@@ -278,6 +287,16 @@ class CommandXGroup : public Commander {
       }
     }
 
+    if (subcommand_ == "createconsumer") {
+      int created_number = 0;
+      auto s = stream_db.CreateConsumer(stream_name_, group_name_, 
consumer_name_, &created_number);
+      if (!s.ok()) {
+        return {Status::RedisExecErr, s.ToString()};
+      }
+
+      *output = redis::Integer(created_number);
+    }
+
     return Status::OK();
   }
 
@@ -285,6 +304,7 @@ class CommandXGroup : public Commander {
   std::string subcommand_;
   std::string stream_name_;
   std::string group_name_;
+  std::string consumer_name_;
   StreamXGroupCreateOptions xgroup_create_options_;
 };
 
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 7a7ffb16..daa5fd41 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -27,6 +27,7 @@
 #include <vector>
 
 #include "db_util.h"
+#include "time_util.h"
 
 namespace redis {
 
@@ -211,6 +212,26 @@ StreamConsumerGroupMetadata 
Stream::decodeStreamConsumerGroupMetadataValue(const
   return consumer_group_metadata;
 }
 
+std::string Stream::internalKeyFromConsumerName(const std::string &ns_key, 
const StreamMetadata &metadata,
+                                                const std::string &group_name, 
const std::string &consumer_name) const {
+  std::string sub_key;
+  PutFixed64(&sub_key, group_name.size());
+  sub_key += group_name;
+  PutFixed64(&sub_key, consumer_name.size());
+  sub_key += consumer_name;
+  sub_key += consumerGroupMetadataDelimiter;
+  std::string entry_key = InternalKey(ns_key, sub_key, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+  return entry_key;
+}
+
+std::string Stream::encodeStreamConsumerMetadataValue(const 
StreamConsumerMetadata &consumer_metadata) {
+  std::string dst;
+  PutFixed64(&dst, consumer_metadata.pending_number);
+  PutFixed64(&dst, consumer_metadata.last_idle);
+  PutFixed64(&dst, consumer_metadata.last_active);
+  return dst;
+}
+
 rocksdb::Status Stream::CreateGroup(const Slice &stream_name, const 
StreamXGroupCreateOptions &options,
                                     const std::string &group_name) {
   if (std::isdigit(group_name[0])) {
@@ -313,6 +334,59 @@ rocksdb::Status Stream::DestroyGroup(const Slice 
&stream_name, const std::string
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+rocksdb::Status Stream::CreateConsumer(const Slice &stream_name, const 
std::string &group_name,
+                                       const std::string &consumer_name, int 
*created_number) {
+  if (std::isdigit(consumer_name[0])) {
+    return rocksdb::Status::InvalidArgument("consumer name cannot start with 
number");
+  }
+  std::string ns_key = AppendNamespacePrefix(stream_name);
+  LockGuard guard(storage_->GetLockManager(), ns_key);
+  StreamMetadata metadata;
+  rocksdb::Status s = GetMetadata(ns_key, &metadata);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+  if (s.IsNotFound()) {
+    return 
rocksdb::Status::InvalidArgument(errXGroupSubcommandRequiresKeyExist);
+  }
+
+  std::string entry_key = internalKeyFromGroupName(ns_key, metadata, 
group_name);
+  std::string get_entry_value;
+  s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, 
&get_entry_value);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+  if (s.IsNotFound()) {
+    return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group " 
+ group_name + " for key name " +
+                                            stream_name.ToString());
+  }
+
+  StreamConsumerMetadata consumer_metadata;
+  auto now = util::GetTimeStampMS();
+  consumer_metadata.last_idle = now;
+  consumer_metadata.last_active = now;
+  std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, 
group_name, consumer_name);
+  std::string consumer_value = 
encodeStreamConsumerMetadataValue(consumer_metadata);
+  std::string get_consumer_value;
+  s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, 
&get_consumer_value);
+  if (!s.IsNotFound()) {
+    return s;
+  }
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisStream);
+  batch->PutLogData(log_data.Encode());
+
+  batch->Put(stream_cf_handle_, consumer_key, consumer_value);
+  StreamConsumerGroupMetadata consumer_group_metadata = 
decodeStreamConsumerGroupMetadataValue(get_entry_value);
+  consumer_group_metadata.consumer_number += 1;
+  std::string consumer_group_metadata_bytes = 
encodeStreamConsumerGroupMetadataValue(consumer_group_metadata);
+  batch->Put(stream_cf_handle_, entry_key, consumer_group_metadata_bytes);
+  s = storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
+  if (s.ok()) *created_number = 1;
+  return s;
+}
+
 rocksdb::Status Stream::DeleteEntries(const Slice &stream_name, const 
std::vector<StreamEntryID> &ids,
                                       uint64_t *deleted_cnt) {
   *deleted_cnt = 0;
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index 81cdc111..75a51ece 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -42,6 +42,8 @@ class Stream : public SubKeyScanner {
   rocksdb::Status CreateGroup(const Slice &stream_name, const 
StreamXGroupCreateOptions &options,
                               const std::string &group_name);
   rocksdb::Status DestroyGroup(const Slice &stream_name, const std::string 
&group_name, uint64_t *delete_cnt);
+  rocksdb::Status CreateConsumer(const Slice &stream_name, const std::string 
&group_name,
+                                 const std::string &consumer_name, int 
*created_number);
   rocksdb::Status DeleteEntries(const Slice &stream_name, const 
std::vector<StreamEntryID> &ids, uint64_t *deleted_cnt);
   rocksdb::Status Len(const Slice &stream_name, const StreamLenOptions 
&options, uint64_t *size);
   rocksdb::Status GetStreamInfo(const Slice &stream_name, bool full, uint64_t 
count, StreamInfo *info);
@@ -69,6 +71,9 @@ class Stream : public SubKeyScanner {
   std::string groupNameFromInternalKey(rocksdb::Slice key) const;
   static std::string encodeStreamConsumerGroupMetadataValue(const 
StreamConsumerGroupMetadata &consumer_group_metadata);
   static StreamConsumerGroupMetadata 
decodeStreamConsumerGroupMetadataValue(const std::string &value);
+  std::string internalKeyFromConsumerName(const std::string &ns_key, const 
StreamMetadata &metadata,
+                                          const std::string &group_name, const 
std::string &consumer_name) const;
+  static std::string encodeStreamConsumerMetadataValue(const 
StreamConsumerMetadata &consumer_metadata);
 };
 
 }  // namespace redis
diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h
index 7b0d7b61..f5d4496d 100644
--- a/src/types/redis_stream_base.h
+++ b/src/types/redis_stream_base.h
@@ -169,6 +169,12 @@ struct StreamConsumerGroupMetadata {
   uint64_t lag = 0;
 };
 
+struct StreamConsumerMetadata {
+  uint64_t pending_number = 0;
+  uint64_t last_idle;
+  uint64_t last_active;
+};
+
 struct StreamInfo {
   uint64_t size;
   uint64_t entries_added;
diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index a91d791c..8b2b67d3 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -886,6 +886,28 @@ func TestStreamOffset(t *testing.T) {
                require.NoError(t, err)
                require.Equal(t, int64(0), result)
        })
+
+       t.Run("XGROUP CREATECONSUMER with different kinds of commands", func(t 
*testing.T) {
+               streamName := "test-stream"
+               groupName := "test-group"
+               consumerName := "test-consumer"
+               require.NoError(t, rdb.Del(ctx, streamName).Err())
+               //No such stream
+               require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, 
groupName, consumerName).Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "1-0",
+                       Values: []string{"data", "a"},
+               }).Err())
+               //no such group
+               require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, 
groupName, consumerName).Err())
+               require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, 
"$").Err())
+
+               r := rdb.XGroupCreateConsumer(ctx, streamName, groupName, 
consumerName).Val()
+               require.Equal(t, int64(1), r)
+               r = rdb.XGroupCreateConsumer(ctx, streamName, groupName, 
consumerName).Val()
+               require.Equal(t, int64(0), r)
+       })
 }
 
 func parseStreamEntryID(id string) (ts int64, seqNum int64) {

Reply via email to