This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 357efac9 Add the support of the XGROUP CREATE and DESTROY command 
(#1704)
357efac9 is described below

commit 357efac993bfd2045ec6bbd6126d2ad01c00b42d
Author: Hauru <[email protected]>
AuthorDate: Sat Sep 2 00:03:44 2023 +0800

    Add the support of the XGROUP CREATE and DESTROY command (#1704)
---
 src/commands/cmd_stream.cc                   |  84 +++++++++++++++
 src/storage/redis_metadata.cc                |   5 +
 src/storage/redis_metadata.h                 |   1 +
 src/types/redis_stream.cc                    | 153 +++++++++++++++++++++++++++
 src/types/redis_stream.h                     |   8 ++
 src/types/redis_stream_base.h                |  14 +++
 tests/cppunit/types/stream_test.cc           |  14 +++
 tests/gocase/unit/type/stream/stream_test.go |  30 ++++++
 8 files changed, 309 insertions(+)

diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index de9f7297..8e19b9d3 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -21,6 +21,7 @@
 #include <memory>
 #include <stdexcept>
 
+#include "command_parser.h"
 #include "commander.h"
 #include "error_constants.h"
 #include "event_util.h"
@@ -205,6 +206,88 @@ class CommandXDel : public Commander {
   std::vector<redis::StreamEntryID> ids_;
 };
 
+class CommandXGroup : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    CommandParser parser(args, 1);
+    subcommand_ = util::ToLower(GET_OR_RET(parser.TakeStr()));
+    stream_name_ = GET_OR_RET(parser.TakeStr());
+    group_name_ = GET_OR_RET(parser.TakeStr());
+
+    if (subcommand_ == "create") {
+      if (args.size() < 5 || args.size() > 8) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      xgroup_create_options_.last_id = GET_OR_RET(parser.TakeStr());
+
+      while (parser.Good()) {
+        if (parser.EatEqICase("mkstream")) {
+          xgroup_create_options_.mkstream = true;
+        } else if (parser.EatEqICase("entriesread")) {
+          auto parse_result = parser.TakeInt<int64_t>();
+          if (!parse_result.IsOK()) {
+            return {Status::RedisParseErr, errValueNotInteger};
+          }
+          if (parse_result.GetValue() < 0 && parse_result.GetValue() != -1) {
+            return {Status::RedisParseErr, "value for ENTRIESREAD must be 
positive or -1"};
+          }
+          xgroup_create_options_.entries_read = parse_result.GetValue();
+        } else {
+          return parser.InvalidSyntax();
+        }
+      }
+
+      return Status::OK();
+    }
+
+    if (subcommand_ == "destroy") {
+      if (args.size() != 4) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      return Status::OK();
+    }
+
+    return {Status::RedisParseErr, "unknown subcommand"};
+  }
+
+  Status Execute(Server *svr, Connection *conn, std::string *output) override {
+    redis::Stream stream_db(svr->storage, conn->GetNamespace());
+
+    if (subcommand_ == "create") {
+      auto s = stream_db.CreateGroup(stream_name_, xgroup_create_options_, 
group_name_);
+      if (!s.ok()) {
+        return {Status::RedisExecErr, s.ToString()};
+      }
+
+      *output = redis::SimpleString("OK");
+    }
+
+    if (subcommand_ == "destroy") {
+      uint64_t delete_cnt = 0;
+      auto s = stream_db.DestroyGroup(stream_name_, group_name_, &delete_cnt);
+      if (!s.ok()) {
+        return {Status::RedisExecErr, s.ToString()};
+      }
+
+      if (delete_cnt > 0) {
+        *output = redis::Integer(1);
+      } else {
+        *output = redis::Integer(0);
+      }
+    }
+
+    return Status::OK();
+  }
+
+ private:
+  std::string subcommand_;
+  std::string stream_name_;
+  std::string group_name_;
+  StreamXGroupCreateOptions xgroup_create_options_;
+};
+
 class CommandXLen : public Commander {
  public:
   Status Parse(const std::vector<std::string> &args) override {
@@ -969,6 +1052,7 @@ class CommandXSetId : public Commander {
 
 REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandXAdd>("xadd", -5, "write", 1, 1, 1),
                         MakeCmdAttr<CommandXDel>("xdel", -3, "write", 1, 1, 1),
+                        MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2, 
2, 1),
                         MakeCmdAttr<CommandXLen>("xlen", -2, "read-only", 1, 
1, 1),
                         MakeCmdAttr<CommandXInfo>("xinfo", -2, "read-only", 0, 
0, 0),
                         MakeCmdAttr<CommandXRange>("xrange", -4, "read-only", 
1, 1, 1),
diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc
index 46d64f95..ee0df37f 100644
--- a/src/storage/redis_metadata.cc
+++ b/src/storage/redis_metadata.cc
@@ -358,6 +358,7 @@ void StreamMetadata::Encode(std::string *dst) {
   PutFixed64(dst, last_entry_id.seq);
 
   PutFixed64(dst, entries_added);
+  PutFixed64(dst, group_number);
 }
 
 rocksdb::Status StreamMetadata::Decode(Slice input) {
@@ -396,6 +397,10 @@ rocksdb::Status StreamMetadata::Decode(Slice input) {
 
   GetFixed64(&input, &entries_added);
 
+  if (input.size() >= 8) {
+    GetFixed64(&input, &group_number);
+  }
+
   return rocksdb::Status::OK();
 }
 
diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h
index d1530d19..068df149 100644
--- a/src/storage/redis_metadata.h
+++ b/src/storage/redis_metadata.h
@@ -196,6 +196,7 @@ class StreamMetadata : public Metadata {
   redis::StreamEntryID first_entry_id;
   redis::StreamEntryID last_entry_id;
   uint64_t entries_added = 0;
+  uint64_t group_number = 0;
 
   explicit StreamMetadata(bool generate_version = true) : 
Metadata(kRedisStream, generate_version) {}
 
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 8b27e936..7a7ffb16 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -30,6 +30,7 @@
 
 namespace redis {
 
+constexpr const char *consumerGroupMetadataDelimiter = "METADATA";
 const char *errSetEntryIdSmallerThanLastGenerated =
     "The ID specified in XSETID is smaller than the target stream top item";
 const char *errEntriesAddedSmallerThanStreamSize =
@@ -38,6 +39,9 @@ const char *errMaxDeletedIdGreaterThanLastGenerated =
     "The ID specified in XSETID is smaller than the provided 
max_deleted_entry_id";
 const char *errEntriesAddedNotSpecifiedForEmptyStream = "an empty stream 
should have non-zero value of ENTRIESADDED";
 const char *errMaxDeletedIdNotSpecifiedForEmptyStream = "an empty stream 
should have MAXDELETEDID";
+const char *errXGroupSubcommandRequiresKeyExist =
+    "The XGROUP subcommand requires the key to exist.\
+Note that for CREATE you may want to use the MKSTREAM option to create an 
empty stream automatically.";
 
 rocksdb::Status Stream::GetMetadata(const Slice &stream_name, StreamMetadata 
*metadata) {
   return Database::GetMetadata(kRedisStream, stream_name, metadata);
@@ -160,6 +164,155 @@ rocksdb::Status Stream::Add(const Slice &stream_name, 
const StreamAddOptions &op
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+std::string Stream::internalKeyFromGroupName(const std::string &ns_key, const 
StreamMetadata &metadata,
+                                             const std::string &group_name) 
const {
+  std::string sub_key;
+  PutFixed64(&sub_key, group_name.size());
+  sub_key += group_name;
+  sub_key += consumerGroupMetadataDelimiter;
+  std::string entry_key = InternalKey(ns_key, sub_key, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+  return entry_key;
+}
+
+std::string Stream::groupNameFromInternalKey(rocksdb::Slice key) const {
+  InternalKey ikey(key, storage_->IsSlotIdEncoded());
+  Slice group_name_metadata = ikey.GetSubKey();
+  uint64_t len = 0;
+  GetFixed64(&group_name_metadata, &len);
+  std::string group_name;
+  if (len <= group_name_metadata.size() - 
strlen(consumerGroupMetadataDelimiter)) {
+    group_name = group_name_metadata.ToString().substr(0, len);
+  }
+  return group_name;
+}
+
+std::string Stream::encodeStreamConsumerGroupMetadataValue(const 
StreamConsumerGroupMetadata &consumer_group_metadata) {
+  std::string dst;
+  PutFixed64(&dst, consumer_group_metadata.consumer_number);
+  PutFixed64(&dst, consumer_group_metadata.pending_number);
+  PutFixed64(&dst, consumer_group_metadata.last_delivered_id.ms);
+  PutFixed64(&dst, consumer_group_metadata.last_delivered_id.seq);
+  PutFixed64(&dst, 
static_cast<uint64_t>(consumer_group_metadata.entries_read));
+  PutFixed64(&dst, consumer_group_metadata.lag);
+  return dst;
+}
+
+StreamConsumerGroupMetadata 
Stream::decodeStreamConsumerGroupMetadataValue(const std::string &value) {
+  StreamConsumerGroupMetadata consumer_group_metadata;
+  rocksdb::Slice input(value);
+  GetFixed64(&input, &consumer_group_metadata.consumer_number);
+  GetFixed64(&input, &consumer_group_metadata.pending_number);
+  GetFixed64(&input, &consumer_group_metadata.last_delivered_id.ms);
+  GetFixed64(&input, &consumer_group_metadata.last_delivered_id.seq);
+  uint64_t entries_read = 0;
+  GetFixed64(&input, &entries_read);
+  consumer_group_metadata.entries_read = static_cast<int64_t>(entries_read);
+  GetFixed64(&input, &consumer_group_metadata.lag);
+  return consumer_group_metadata;
+}
+
+rocksdb::Status Stream::CreateGroup(const Slice &stream_name, const 
StreamXGroupCreateOptions &options,
+                                    const std::string &group_name) {
+  if (std::isdigit(group_name[0])) {
+    return rocksdb::Status::InvalidArgument("group name cannot start with 
number");
+  }
+  std::string ns_key = AppendNamespacePrefix(stream_name);
+
+  LockGuard guard(storage_->GetLockManager(), ns_key);
+  StreamMetadata metadata;
+  rocksdb::Status s = GetMetadata(ns_key, &metadata);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+
+  if (s.IsNotFound() && !options.mkstream) {
+    return 
rocksdb::Status::InvalidArgument(errXGroupSubcommandRequiresKeyExist);
+  }
+
+  StreamConsumerGroupMetadata consumer_group_metadata;
+  if (options.last_id == "$") {
+    consumer_group_metadata.last_delivered_id = metadata.last_entry_id;
+  } else {
+    auto s = ParseStreamEntryID(options.last_id, 
&consumer_group_metadata.last_delivered_id);
+    if (!s.IsOK()) {
+      return rocksdb::Status::InvalidArgument(s.Msg());
+    }
+  }
+  consumer_group_metadata.entries_read = options.entries_read;
+  std::string entry_key = internalKeyFromGroupName(ns_key, metadata, 
group_name);
+  std::string entry_value = 
encodeStreamConsumerGroupMetadataValue(consumer_group_metadata);
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisStream);
+  batch->PutLogData(log_data.Encode());
+
+  std::string get_entry_value;
+  s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, 
&get_entry_value);
+  if (!s.IsNotFound()) {
+    if (!s.ok()) {
+      return s;
+    }
+    return rocksdb::Status::InvalidArgument("BUSYGROUP Consumer Group name 
already exists");
+  }
+
+  batch->Put(stream_cf_handle_, entry_key, entry_value);
+  metadata.group_number += 1;
+  std::string metadata_bytes;
+  metadata.Encode(&metadata_bytes);
+  batch->Put(metadata_cf_handle_, ns_key, metadata_bytes);
+  return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+}
+
+rocksdb::Status Stream::DestroyGroup(const Slice &stream_name, const 
std::string &group_name, uint64_t *delete_cnt) {
+  *delete_cnt = 0;
+  std::string ns_key = AppendNamespacePrefix(stream_name);
+
+  LockGuard guard(storage_->GetLockManager(), ns_key);
+  StreamMetadata metadata;
+  rocksdb::Status s = GetMetadata(ns_key, &metadata);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+
+  if (s.IsNotFound()) {
+    return 
rocksdb::Status::InvalidArgument(errXGroupSubcommandRequiresKeyExist);
+  }
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisStream);
+  batch->PutLogData(log_data.Encode());
+
+  std::string sub_key_prefix;
+  PutFixed64(&sub_key_prefix, group_name.size());
+  sub_key_prefix += group_name;
+  std::string next_version_prefix_key =
+      InternalKey(ns_key, sub_key_prefix, metadata.version + 1, 
storage_->IsSlotIdEncoded()).Encode();
+  std::string prefix_key = InternalKey(ns_key, sub_key_prefix, 
metadata.version, storage_->IsSlotIdEncoded()).Encode();
+
+  rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
+  LatestSnapShot ss(storage_);
+  read_options.snapshot = ss.GetSnapShot();
+  rocksdb::Slice upper_bound(next_version_prefix_key);
+  read_options.iterate_upper_bound = &upper_bound;
+  rocksdb::Slice lower_bound(prefix_key);
+  read_options.iterate_lower_bound = &lower_bound;
+
+  auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
+  for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+    batch->Delete(stream_cf_handle_, iter->key());
+    *delete_cnt += 1;
+  }
+
+  if (*delete_cnt != 0) {
+    metadata.group_number -= 1;
+    std::string metadata_bytes;
+    metadata.Encode(&metadata_bytes);
+    batch->Put(metadata_cf_handle_, ns_key, metadata_bytes);
+  }
+
+  return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+}
+
 rocksdb::Status Stream::DeleteEntries(const Slice &stream_name, const 
std::vector<StreamEntryID> &ids,
                                       uint64_t *deleted_cnt) {
   *deleted_cnt = 0;
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index 28aa88fc..81cdc111 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -39,6 +39,9 @@ class Stream : public SubKeyScanner {
       : SubKeyScanner(storage, ns), 
stream_cf_handle_(storage->GetCFHandle("stream")) {}
   rocksdb::Status Add(const Slice &stream_name, const StreamAddOptions 
&options, const std::vector<std::string> &values,
                       StreamEntryID *id);
+  rocksdb::Status CreateGroup(const Slice &stream_name, const 
StreamXGroupCreateOptions &options,
+                              const std::string &group_name);
+  rocksdb::Status DestroyGroup(const Slice &stream_name, const std::string 
&group_name, uint64_t *delete_cnt);
   rocksdb::Status DeleteEntries(const Slice &stream_name, const 
std::vector<StreamEntryID> &ids, uint64_t *deleted_cnt);
   rocksdb::Status Len(const Slice &stream_name, const StreamLenOptions 
&options, uint64_t *size);
   rocksdb::Status GetStreamInfo(const Slice &stream_name, bool full, uint64_t 
count, StreamInfo *info);
@@ -61,6 +64,11 @@ class Stream : public SubKeyScanner {
                                      const StreamEntryID &id) const;
   uint64_t trim(const std::string &ns_key, const StreamTrimOptions &options, 
StreamMetadata *metadata,
                 rocksdb::WriteBatch *batch);
+  std::string internalKeyFromGroupName(const std::string &ns_key, const 
StreamMetadata &metadata,
+                                       const std::string &group_name) const;
+  std::string groupNameFromInternalKey(rocksdb::Slice key) const;
+  static std::string encodeStreamConsumerGroupMetadataValue(const 
StreamConsumerGroupMetadata &consumer_group_metadata);
+  static StreamConsumerGroupMetadata 
decodeStreamConsumerGroupMetadataValue(const std::string &value);
 };
 
 }  // namespace redis
diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h
index 17a5c8a1..7b0d7b61 100644
--- a/src/types/redis_stream_base.h
+++ b/src/types/redis_stream_base.h
@@ -155,6 +155,20 @@ struct StreamLenOptions {
   bool to_first = false;
 };
 
+struct StreamXGroupCreateOptions {
+  bool mkstream = false;
+  int64_t entries_read = 0;
+  std::string last_id;
+};
+
+struct StreamConsumerGroupMetadata {
+  uint64_t consumer_number = 0;
+  uint64_t pending_number = 0;
+  StreamEntryID last_delivered_id;
+  int64_t entries_read = 0;
+  uint64_t lag = 0;
+};
+
 struct StreamInfo {
   uint64_t size;
   uint64_t entries_added;
diff --git a/tests/cppunit/types/stream_test.cc 
b/tests/cppunit/types/stream_test.cc
index 9ed7bb33..b3f54e6a 100644
--- a/tests/cppunit/types/stream_test.cc
+++ b/tests/cppunit/types/stream_test.cc
@@ -2512,3 +2512,17 @@ TEST_F(RedisStreamTest, 
StreamSetIdLastIdGreaterThanExisting) {
   EXPECT_EQ(info.entries_added, added);
   EXPECT_EQ(info.max_deleted_entry_id.ToString(), max_del_id->ToString());
 }
+
+TEST_F(RedisStreamTest, StreamConsumerGroupCreateAndDestroy) {
+  redis::StreamXGroupCreateOptions create_options = {true, 0, "$"};
+  std::string stream_name = "TestStream";
+  std::string group_name = "TestGroup";
+  auto s = stream_->CreateGroup(stream_name, create_options, group_name);
+  EXPECT_TRUE(s.ok());
+  uint64_t delete_cnt = 0;
+  s = stream_->DestroyGroup(stream_name, group_name, &delete_cnt);
+  EXPECT_TRUE(delete_cnt != 0);
+  delete_cnt = 0;
+  s = stream_->DestroyGroup(stream_name, group_name, &delete_cnt);
+  EXPECT_TRUE(delete_cnt == 0);
+}
diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index 1559ca2f..a91d791c 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -856,6 +856,36 @@ func TestStreamOffset(t *testing.T) {
                require.Less(t, ts, now+5_000)
                require.EqualValues(t, providedSeqNum, seqNum)
        })
+
+       t.Run("XGROUP CREATE with different kinds of commands and XGROUP 
DESTROY", func(t *testing.T) {
+               streamName := "test-stream-a"
+               groupName := "test-group-a"
+               require.NoError(t, rdb.Del(ctx, streamName).Err())
+               // No such stream (No such key)
+               require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, 
groupName, "$").Err())
+               require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, 
groupName, "$", "ENTRIESREAD", "10").Err())
+               require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, 
groupName, "$", "ENTRIESREAD").Err())
+               require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, 
groupName, "$", "MKSTREAM", "ENTRIESREAD").Err())
+               require.NoError(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, 
groupName, "$", "MKSTREAM").Err())
+               require.NoError(t, rdb.XInfoStream(ctx, streamName).Err())
+               require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, 
groupName, "$").Err())
+               // Invalid syntax
+               groupName = "test-group-b"
+               require.Error(t, rdb.Do(ctx, "XGROUP", "CREAT", streamName, 
groupName, "$").Err())
+               require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, 
groupName, "$", "ENTRIEREAD", "10").Err())
+               require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, 
groupName, "$", "ENTRIESREAD", "-10").Err())
+               require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, 
"1test-group-c", "$").Err())
+
+               require.NoError(t, rdb.Del(ctx, "myStream").Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: 
"myStream", Values: []string{"iTeM", "1", "vAluE", "a"}}).Err())
+               require.NoError(t, rdb.XGroupCreate(ctx, "myStream", "myGroup", 
"$").Err())
+               result, err := rdb.XGroupDestroy(ctx, "myStream", 
"myGroup").Result()
+               require.NoError(t, err)
+               require.Equal(t, int64(1), result)
+               result, err = rdb.XGroupDestroy(ctx, "myStream", 
"myGroup").Result()
+               require.NoError(t, err)
+               require.Equal(t, int64(0), result)
+       })
 }
 
 func parseStreamEntryID(id string) (ts int64, seqNum int64) {

Reply via email to