This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 357efac9 Add the support of the XGROUP CREATE and DESTROY command
(#1704)
357efac9 is described below
commit 357efac993bfd2045ec6bbd6126d2ad01c00b42d
Author: Hauru <[email protected]>
AuthorDate: Sat Sep 2 00:03:44 2023 +0800
Add the support of the XGROUP CREATE and DESTROY command (#1704)
---
src/commands/cmd_stream.cc | 84 +++++++++++++++
src/storage/redis_metadata.cc | 5 +
src/storage/redis_metadata.h | 1 +
src/types/redis_stream.cc | 153 +++++++++++++++++++++++++++
src/types/redis_stream.h | 8 ++
src/types/redis_stream_base.h | 14 +++
tests/cppunit/types/stream_test.cc | 14 +++
tests/gocase/unit/type/stream/stream_test.go | 30 ++++++
8 files changed, 309 insertions(+)
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index de9f7297..8e19b9d3 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -21,6 +21,7 @@
#include <memory>
#include <stdexcept>
+#include "command_parser.h"
#include "commander.h"
#include "error_constants.h"
#include "event_util.h"
@@ -205,6 +206,88 @@ class CommandXDel : public Commander {
std::vector<redis::StreamEntryID> ids_;
};
+class CommandXGroup : public Commander {
+ public:
+ Status Parse(const std::vector<std::string> &args) override {
+ CommandParser parser(args, 1);
+ subcommand_ = util::ToLower(GET_OR_RET(parser.TakeStr()));
+ stream_name_ = GET_OR_RET(parser.TakeStr());
+ group_name_ = GET_OR_RET(parser.TakeStr());
+
+ if (subcommand_ == "create") {
+ if (args.size() < 5 || args.size() > 8) {
+ return {Status::RedisParseErr, errWrongNumOfArguments};
+ }
+
+ xgroup_create_options_.last_id = GET_OR_RET(parser.TakeStr());
+
+ while (parser.Good()) {
+ if (parser.EatEqICase("mkstream")) {
+ xgroup_create_options_.mkstream = true;
+ } else if (parser.EatEqICase("entriesread")) {
+ auto parse_result = parser.TakeInt<int64_t>();
+ if (!parse_result.IsOK()) {
+ return {Status::RedisParseErr, errValueNotInteger};
+ }
+ if (parse_result.GetValue() < 0 && parse_result.GetValue() != -1) {
+ return {Status::RedisParseErr, "value for ENTRIESREAD must be
positive or -1"};
+ }
+ xgroup_create_options_.entries_read = parse_result.GetValue();
+ } else {
+ return parser.InvalidSyntax();
+ }
+ }
+
+ return Status::OK();
+ }
+
+ if (subcommand_ == "destroy") {
+ if (args.size() != 4) {
+ return {Status::RedisParseErr, errWrongNumOfArguments};
+ }
+
+ return Status::OK();
+ }
+
+ return {Status::RedisParseErr, "unknown subcommand"};
+ }
+
+ Status Execute(Server *svr, Connection *conn, std::string *output) override {
+ redis::Stream stream_db(svr->storage, conn->GetNamespace());
+
+ if (subcommand_ == "create") {
+ auto s = stream_db.CreateGroup(stream_name_, xgroup_create_options_,
group_name_);
+ if (!s.ok()) {
+ return {Status::RedisExecErr, s.ToString()};
+ }
+
+ *output = redis::SimpleString("OK");
+ }
+
+ if (subcommand_ == "destroy") {
+ uint64_t delete_cnt = 0;
+ auto s = stream_db.DestroyGroup(stream_name_, group_name_, &delete_cnt);
+ if (!s.ok()) {
+ return {Status::RedisExecErr, s.ToString()};
+ }
+
+ if (delete_cnt > 0) {
+ *output = redis::Integer(1);
+ } else {
+ *output = redis::Integer(0);
+ }
+ }
+
+ return Status::OK();
+ }
+
+ private:
+ std::string subcommand_;
+ std::string stream_name_;
+ std::string group_name_;
+ StreamXGroupCreateOptions xgroup_create_options_;
+};
+
class CommandXLen : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
@@ -969,6 +1052,7 @@ class CommandXSetId : public Commander {
REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandXAdd>("xadd", -5, "write", 1, 1, 1),
MakeCmdAttr<CommandXDel>("xdel", -3, "write", 1, 1, 1),
+ MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2,
2, 1),
MakeCmdAttr<CommandXLen>("xlen", -2, "read-only", 1,
1, 1),
MakeCmdAttr<CommandXInfo>("xinfo", -2, "read-only", 0,
0, 0),
MakeCmdAttr<CommandXRange>("xrange", -4, "read-only",
1, 1, 1),
diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc
index 46d64f95..ee0df37f 100644
--- a/src/storage/redis_metadata.cc
+++ b/src/storage/redis_metadata.cc
@@ -358,6 +358,7 @@ void StreamMetadata::Encode(std::string *dst) {
PutFixed64(dst, last_entry_id.seq);
PutFixed64(dst, entries_added);
+ PutFixed64(dst, group_number);
}
rocksdb::Status StreamMetadata::Decode(Slice input) {
@@ -396,6 +397,10 @@ rocksdb::Status StreamMetadata::Decode(Slice input) {
GetFixed64(&input, &entries_added);
+ if (input.size() >= 8) {
+ GetFixed64(&input, &group_number);
+ }
+
return rocksdb::Status::OK();
}
diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h
index d1530d19..068df149 100644
--- a/src/storage/redis_metadata.h
+++ b/src/storage/redis_metadata.h
@@ -196,6 +196,7 @@ class StreamMetadata : public Metadata {
redis::StreamEntryID first_entry_id;
redis::StreamEntryID last_entry_id;
uint64_t entries_added = 0;
+ uint64_t group_number = 0;
explicit StreamMetadata(bool generate_version = true) :
Metadata(kRedisStream, generate_version) {}
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 8b27e936..7a7ffb16 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -30,6 +30,7 @@
namespace redis {
+constexpr const char *consumerGroupMetadataDelimiter = "METADATA";
const char *errSetEntryIdSmallerThanLastGenerated =
"The ID specified in XSETID is smaller than the target stream top item";
const char *errEntriesAddedSmallerThanStreamSize =
@@ -38,6 +39,9 @@ const char *errMaxDeletedIdGreaterThanLastGenerated =
"The ID specified in XSETID is smaller than the provided
max_deleted_entry_id";
const char *errEntriesAddedNotSpecifiedForEmptyStream = "an empty stream
should have non-zero value of ENTRIESADDED";
const char *errMaxDeletedIdNotSpecifiedForEmptyStream = "an empty stream
should have MAXDELETEDID";
+const char *errXGroupSubcommandRequiresKeyExist =
+ "The XGROUP subcommand requires the key to exist.\
+Note that for CREATE you may want to use the MKSTREAM option to create an
empty stream automatically.";
rocksdb::Status Stream::GetMetadata(const Slice &stream_name, StreamMetadata
*metadata) {
return Database::GetMetadata(kRedisStream, stream_name, metadata);
@@ -160,6 +164,155 @@ rocksdb::Status Stream::Add(const Slice &stream_name,
const StreamAddOptions &op
return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
+std::string Stream::internalKeyFromGroupName(const std::string &ns_key, const
StreamMetadata &metadata,
+ const std::string &group_name)
const {
+ std::string sub_key;
+ PutFixed64(&sub_key, group_name.size());
+ sub_key += group_name;
+ sub_key += consumerGroupMetadataDelimiter;
+ std::string entry_key = InternalKey(ns_key, sub_key, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+ return entry_key;
+}
+
+std::string Stream::groupNameFromInternalKey(rocksdb::Slice key) const {
+ InternalKey ikey(key, storage_->IsSlotIdEncoded());
+ Slice group_name_metadata = ikey.GetSubKey();
+ uint64_t len = 0;
+ GetFixed64(&group_name_metadata, &len);
+ std::string group_name;
+ if (len <= group_name_metadata.size() -
strlen(consumerGroupMetadataDelimiter)) {
+ group_name = group_name_metadata.ToString().substr(0, len);
+ }
+ return group_name;
+}
+
+std::string Stream::encodeStreamConsumerGroupMetadataValue(const
StreamConsumerGroupMetadata &consumer_group_metadata) {
+ std::string dst;
+ PutFixed64(&dst, consumer_group_metadata.consumer_number);
+ PutFixed64(&dst, consumer_group_metadata.pending_number);
+ PutFixed64(&dst, consumer_group_metadata.last_delivered_id.ms);
+ PutFixed64(&dst, consumer_group_metadata.last_delivered_id.seq);
+ PutFixed64(&dst,
static_cast<uint64_t>(consumer_group_metadata.entries_read));
+ PutFixed64(&dst, consumer_group_metadata.lag);
+ return dst;
+}
+
+StreamConsumerGroupMetadata
Stream::decodeStreamConsumerGroupMetadataValue(const std::string &value) {
+ StreamConsumerGroupMetadata consumer_group_metadata;
+ rocksdb::Slice input(value);
+ GetFixed64(&input, &consumer_group_metadata.consumer_number);
+ GetFixed64(&input, &consumer_group_metadata.pending_number);
+ GetFixed64(&input, &consumer_group_metadata.last_delivered_id.ms);
+ GetFixed64(&input, &consumer_group_metadata.last_delivered_id.seq);
+ uint64_t entries_read = 0;
+ GetFixed64(&input, &entries_read);
+ consumer_group_metadata.entries_read = static_cast<int64_t>(entries_read);
+ GetFixed64(&input, &consumer_group_metadata.lag);
+ return consumer_group_metadata;
+}
+
+rocksdb::Status Stream::CreateGroup(const Slice &stream_name, const
StreamXGroupCreateOptions &options,
+ const std::string &group_name) {
+ if (std::isdigit(group_name[0])) {
+ return rocksdb::Status::InvalidArgument("group name cannot start with
number");
+ }
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+
+ LockGuard guard(storage_->GetLockManager(), ns_key);
+ StreamMetadata metadata;
+ rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+
+ if (s.IsNotFound() && !options.mkstream) {
+ return
rocksdb::Status::InvalidArgument(errXGroupSubcommandRequiresKeyExist);
+ }
+
+ StreamConsumerGroupMetadata consumer_group_metadata;
+ if (options.last_id == "$") {
+ consumer_group_metadata.last_delivered_id = metadata.last_entry_id;
+ } else {
+ auto s = ParseStreamEntryID(options.last_id,
&consumer_group_metadata.last_delivered_id);
+ if (!s.IsOK()) {
+ return rocksdb::Status::InvalidArgument(s.Msg());
+ }
+ }
+ consumer_group_metadata.entries_read = options.entries_read;
+ std::string entry_key = internalKeyFromGroupName(ns_key, metadata,
group_name);
+ std::string entry_value =
encodeStreamConsumerGroupMetadataValue(consumer_group_metadata);
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisStream);
+ batch->PutLogData(log_data.Encode());
+
+ std::string get_entry_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key,
&get_entry_value);
+ if (!s.IsNotFound()) {
+ if (!s.ok()) {
+ return s;
+ }
+ return rocksdb::Status::InvalidArgument("BUSYGROUP Consumer Group name
already exists");
+ }
+
+ batch->Put(stream_cf_handle_, entry_key, entry_value);
+ metadata.group_number += 1;
+ std::string metadata_bytes;
+ metadata.Encode(&metadata_bytes);
+ batch->Put(metadata_cf_handle_, ns_key, metadata_bytes);
+ return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+}
+
+rocksdb::Status Stream::DestroyGroup(const Slice &stream_name, const
std::string &group_name, uint64_t *delete_cnt) {
+ *delete_cnt = 0;
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+
+ LockGuard guard(storage_->GetLockManager(), ns_key);
+ StreamMetadata metadata;
+ rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+
+ if (s.IsNotFound()) {
+ return
rocksdb::Status::InvalidArgument(errXGroupSubcommandRequiresKeyExist);
+ }
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisStream);
+ batch->PutLogData(log_data.Encode());
+
+ std::string sub_key_prefix;
+ PutFixed64(&sub_key_prefix, group_name.size());
+ sub_key_prefix += group_name;
+ std::string next_version_prefix_key =
+ InternalKey(ns_key, sub_key_prefix, metadata.version + 1,
storage_->IsSlotIdEncoded()).Encode();
+ std::string prefix_key = InternalKey(ns_key, sub_key_prefix,
metadata.version, storage_->IsSlotIdEncoded()).Encode();
+
+ rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
+ LatestSnapShot ss(storage_);
+ read_options.snapshot = ss.GetSnapShot();
+ rocksdb::Slice upper_bound(next_version_prefix_key);
+ read_options.iterate_upper_bound = &upper_bound;
+ rocksdb::Slice lower_bound(prefix_key);
+ read_options.iterate_lower_bound = &lower_bound;
+
+ auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ batch->Delete(stream_cf_handle_, iter->key());
+ *delete_cnt += 1;
+ }
+
+ if (*delete_cnt != 0) {
+ metadata.group_number -= 1;
+ std::string metadata_bytes;
+ metadata.Encode(&metadata_bytes);
+ batch->Put(metadata_cf_handle_, ns_key, metadata_bytes);
+ }
+
+ return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+}
+
rocksdb::Status Stream::DeleteEntries(const Slice &stream_name, const
std::vector<StreamEntryID> &ids,
uint64_t *deleted_cnt) {
*deleted_cnt = 0;
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index 28aa88fc..81cdc111 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -39,6 +39,9 @@ class Stream : public SubKeyScanner {
: SubKeyScanner(storage, ns),
stream_cf_handle_(storage->GetCFHandle("stream")) {}
rocksdb::Status Add(const Slice &stream_name, const StreamAddOptions
&options, const std::vector<std::string> &values,
StreamEntryID *id);
+ rocksdb::Status CreateGroup(const Slice &stream_name, const
StreamXGroupCreateOptions &options,
+ const std::string &group_name);
+ rocksdb::Status DestroyGroup(const Slice &stream_name, const std::string
&group_name, uint64_t *delete_cnt);
rocksdb::Status DeleteEntries(const Slice &stream_name, const
std::vector<StreamEntryID> &ids, uint64_t *deleted_cnt);
rocksdb::Status Len(const Slice &stream_name, const StreamLenOptions
&options, uint64_t *size);
rocksdb::Status GetStreamInfo(const Slice &stream_name, bool full, uint64_t
count, StreamInfo *info);
@@ -61,6 +64,11 @@ class Stream : public SubKeyScanner {
const StreamEntryID &id) const;
uint64_t trim(const std::string &ns_key, const StreamTrimOptions &options,
StreamMetadata *metadata,
rocksdb::WriteBatch *batch);
+ std::string internalKeyFromGroupName(const std::string &ns_key, const
StreamMetadata &metadata,
+ const std::string &group_name) const;
+ std::string groupNameFromInternalKey(rocksdb::Slice key) const;
+ static std::string encodeStreamConsumerGroupMetadataValue(const
StreamConsumerGroupMetadata &consumer_group_metadata);
+ static StreamConsumerGroupMetadata
decodeStreamConsumerGroupMetadataValue(const std::string &value);
};
} // namespace redis
diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h
index 17a5c8a1..7b0d7b61 100644
--- a/src/types/redis_stream_base.h
+++ b/src/types/redis_stream_base.h
@@ -155,6 +155,20 @@ struct StreamLenOptions {
bool to_first = false;
};
+struct StreamXGroupCreateOptions {
+ bool mkstream = false;
+ int64_t entries_read = 0;
+ std::string last_id;
+};
+
+struct StreamConsumerGroupMetadata {
+ uint64_t consumer_number = 0;
+ uint64_t pending_number = 0;
+ StreamEntryID last_delivered_id;
+ int64_t entries_read = 0;
+ uint64_t lag = 0;
+};
+
struct StreamInfo {
uint64_t size;
uint64_t entries_added;
diff --git a/tests/cppunit/types/stream_test.cc
b/tests/cppunit/types/stream_test.cc
index 9ed7bb33..b3f54e6a 100644
--- a/tests/cppunit/types/stream_test.cc
+++ b/tests/cppunit/types/stream_test.cc
@@ -2512,3 +2512,17 @@ TEST_F(RedisStreamTest,
StreamSetIdLastIdGreaterThanExisting) {
EXPECT_EQ(info.entries_added, added);
EXPECT_EQ(info.max_deleted_entry_id.ToString(), max_del_id->ToString());
}
+
+TEST_F(RedisStreamTest, StreamConsumerGroupCreateAndDestroy) {
+ redis::StreamXGroupCreateOptions create_options = {true, 0, "$"};
+ std::string stream_name = "TestStream";
+ std::string group_name = "TestGroup";
+ auto s = stream_->CreateGroup(stream_name, create_options, group_name);
+ EXPECT_TRUE(s.ok());
+ uint64_t delete_cnt = 0;
+ s = stream_->DestroyGroup(stream_name, group_name, &delete_cnt);
+ EXPECT_TRUE(delete_cnt != 0);
+ delete_cnt = 0;
+ s = stream_->DestroyGroup(stream_name, group_name, &delete_cnt);
+ EXPECT_TRUE(delete_cnt == 0);
+}
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index 1559ca2f..a91d791c 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -856,6 +856,36 @@ func TestStreamOffset(t *testing.T) {
require.Less(t, ts, now+5_000)
require.EqualValues(t, providedSeqNum, seqNum)
})
+
+ t.Run("XGROUP CREATE with different kinds of commands and XGROUP
DESTROY", func(t *testing.T) {
+ streamName := "test-stream-a"
+ groupName := "test-group-a"
+ require.NoError(t, rdb.Del(ctx, streamName).Err())
+ // No such stream (No such key)
+ require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$").Err())
+ require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$", "ENTRIESREAD", "10").Err())
+ require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$", "ENTRIESREAD").Err())
+ require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$", "MKSTREAM", "ENTRIESREAD").Err())
+ require.NoError(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$", "MKSTREAM").Err())
+ require.NoError(t, rdb.XInfoStream(ctx, streamName).Err())
+ require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$").Err())
+ // Invalid syntax
+ groupName = "test-group-b"
+ require.Error(t, rdb.Do(ctx, "XGROUP", "CREAT", streamName,
groupName, "$").Err())
+ require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$", "ENTRIEREAD", "10").Err())
+ require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$", "ENTRIESREAD", "-10").Err())
+ require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
"1test-group-c", "$").Err())
+
+ require.NoError(t, rdb.Del(ctx, "myStream").Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"myStream", Values: []string{"iTeM", "1", "vAluE", "a"}}).Err())
+ require.NoError(t, rdb.XGroupCreate(ctx, "myStream", "myGroup",
"$").Err())
+ result, err := rdb.XGroupDestroy(ctx, "myStream",
"myGroup").Result()
+ require.NoError(t, err)
+ require.Equal(t, int64(1), result)
+ result, err = rdb.XGroupDestroy(ctx, "myStream",
"myGroup").Result()
+ require.NoError(t, err)
+ require.Equal(t, int64(0), result)
+ })
}
func parseStreamEntryID(id string) (ts int64, seqNum int64) {