This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 7d61e940 Add support of the command XGROUP DELCONSUMER (#2141)
7d61e940 is described below
commit 7d61e94007098f759a630ff9e04c5b44a3f69c3c
Author: Hauru <[email protected]>
AuthorDate: Sat Mar 9 21:41:21 2024 +0800
Add support of the command XGROUP DELCONSUMER (#2141)
---
src/commands/cmd_stream.cc | 12 ++++-
src/types/redis_stream.cc | 71 +++++++++++++++++++++++++
src/types/redis_stream.h | 2 +
tests/gocase/unit/type/stream/stream_test.go | 78 ++++++++++++++++++++++++++++
4 files changed, 162 insertions(+), 1 deletion(-)
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 04ea2942..a90caece 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -250,7 +250,7 @@ class CommandXGroup : public Commander {
return Status::OK();
}
- if (subcommand_ == "createconsumer") {
+ if (subcommand_ == "createconsumer" || subcommand_ == "delconsumer") {
if (args.size() != 5) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}
@@ -322,6 +322,16 @@ class CommandXGroup : public Commander {
*output = redis::Integer(created_number);
}
+ if (subcommand_ == "delconsumer") {
+ uint64_t deleted_pel = 0;
+ auto s = stream_db.DestroyConsumer(stream_name_, group_name_,
consumer_name_, deleted_pel);
+ if (!s.ok()) {
+ return {Status::RedisExecErr, s.ToString()};
+ }
+
+ *output = redis::Integer(deleted_pel);
+ }
+
if (subcommand_ == "setid") {
auto s = stream_db.GroupSetId(stream_name_, group_name_,
xgroup_create_options_);
if (!s.ok()) {
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 0bebd28b..476c5ba4 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -475,6 +475,77 @@ rocksdb::Status Stream::CreateConsumer(const Slice
&stream_name, const std::stri
return createConsumerWithoutLock(stream_name, group_name, consumer_name,
created_number);
}
+rocksdb::Status Stream::DestroyConsumer(const Slice &stream_name, const
std::string &group_name,
+ const std::string &consumer_name,
uint64_t &deleted_pel) {
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+ LockGuard guard(storage_->GetLockManager(), ns_key);
+ StreamMetadata metadata;
+ rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ return
rocksdb::Status::InvalidArgument(errXGroupSubcommandRequiresKeyExist);
+ }
+
+ std::string group_key = internalKeyFromGroupName(ns_key, metadata,
group_name);
+ std::string get_group_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key,
&get_group_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group "
+ group_name + " for key name " +
+ stream_name.ToString());
+ }
+
+ std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata,
group_name, consumer_name);
+ std::string get_consumer_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key,
&get_consumer_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ return rocksdb::Status::OK();
+ }
+
+ StreamConsumerMetadata consumer_metadata =
decodeStreamConsumerMetadataValue(get_consumer_value);
+ deleted_pel = consumer_metadata.pending_number;
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisStream);
+ batch->PutLogData(log_data.Encode());
+
+ std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata,
group_name, StreamEntryID::Minimum());
+ std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata,
group_name, StreamEntryID::Maximum());
+
+ rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
+ LatestSnapShot ss(storage_);
+ read_options.snapshot = ss.GetSnapShot();
+ rocksdb::Slice upper_bound(end_key);
+ read_options.iterate_upper_bound = &upper_bound;
+ rocksdb::Slice lower_bound(prefix_key);
+ read_options.iterate_lower_bound = &lower_bound;
+
+ auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) {
+ std::string tmp_group_name;
+ FMT_MAYBE_UNUSED StreamEntryID entry_id =
groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
+ if (tmp_group_name != group_name) continue;
+ StreamPelEntry pel_entry =
decodeStreamPelEntryValue(iter->value().ToString());
+ if (pel_entry.consumer_name == consumer_name) {
+ batch->Delete(stream_cf_handle_, iter->key());
+ }
+ }
+ }
+ batch->Delete(stream_cf_handle_, consumer_key);
+ StreamConsumerGroupMetadata group_metadata =
decodeStreamConsumerGroupMetadataValue(get_group_value);
+ group_metadata.consumer_number -= 1;
+ group_metadata.pending_number -= deleted_pel;
+ batch->Put(stream_cf_handle_, group_key,
encodeStreamConsumerGroupMetadataValue(group_metadata));
+ return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+}
+
rocksdb::Status Stream::GroupSetId(const Slice &stream_name, const std::string
&group_name,
const StreamXGroupCreateOptions &options) {
std::string ns_key = AppendNamespacePrefix(stream_name);
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index 4381e4ef..8fa7bb70 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -44,6 +44,8 @@ class Stream : public SubKeyScanner {
rocksdb::Status DestroyGroup(const Slice &stream_name, const std::string
&group_name, uint64_t *delete_cnt);
rocksdb::Status CreateConsumer(const Slice &stream_name, const std::string
&group_name,
const std::string &consumer_name, int
*created_number);
+ rocksdb::Status DestroyConsumer(const Slice &stream_name, const std::string
&group_name,
+ const std::string &consumer_name, uint64_t
&deleted_pel);
rocksdb::Status GroupSetId(const Slice &stream_name, const std::string
&group_name,
const StreamXGroupCreateOptions &options);
rocksdb::Status DeleteEntries(const Slice &stream_name, const
std::vector<StreamEntryID> &ids, uint64_t *deleted_cnt);
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index d74ff2b0..66ae0705 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -919,6 +919,84 @@ func TestStreamOffset(t *testing.T) {
require.Equal(t, int64(0), r)
})
+ t.Run("XGROUP DELCONSUMER with different kinds of commands", func(t
*testing.T) {
+ streamName := "test-stream"
+ groupName := "test-group"
+ consumerName := "test-consumer"
+ require.NoError(t, rdb.Del(ctx, streamName).Err())
+ //No such stream
+ require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName,
groupName, consumerName).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "1-0",
+ Values: []string{"data", "a"},
+ }).Err())
+ //no such group
+ require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName,
groupName, consumerName).Err())
+ require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"$").Err())
+ require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName,
groupName, consumerName).Err())
+
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "*",
+ Values: []string{"data1", "a1"},
+ }).Err())
+ require.NoError(t, rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName,
+ Streams: []string{streamName, ">"},
+ Count: 1,
+ NoAck: false,
+ }).Err())
+ ri, erri := rdb.XInfoGroups(ctx, streamName).Result()
+ require.NoError(t, erri)
+ require.Equal(t, int64(1), ri[0].Consumers)
+ require.Equal(t, int64(1), ri[0].Pending)
+
+ r, err := rdb.XGroupDelConsumer(ctx, streamName, groupName,
consumerName).Result()
+ require.NoError(t, err)
+ require.Equal(t, int64(1), r)
+ ri, erri = rdb.XInfoGroups(ctx, streamName).Result()
+ require.NoError(t, erri)
+ require.Equal(t, int64(0), ri[0].Consumers)
+ require.Equal(t, int64(0), ri[0].Pending)
+
+ require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName,
groupName, consumerName).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "*",
+ Values: []string{"data2", "a2"},
+ }).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "*",
+ Values: []string{"data3", "a3"},
+ }).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "*",
+ Values: []string{"data4", "a4"},
+ }).Err())
+ require.NoError(t, rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName,
+ Streams: []string{streamName, ">"},
+ Count: 3,
+ NoAck: false,
+ }).Err())
+ ri, erri = rdb.XInfoGroups(ctx, streamName).Result()
+ require.NoError(t, erri)
+ require.Equal(t, int64(1), ri[0].Consumers)
+ require.Equal(t, int64(3), ri[0].Pending)
+ r, err = rdb.XGroupDelConsumer(ctx, streamName, groupName,
consumerName).Result()
+ require.NoError(t, err)
+ require.Equal(t, int64(3), r)
+ ri, erri = rdb.XInfoGroups(ctx, streamName).Result()
+ require.NoError(t, erri)
+ require.Equal(t, int64(0), ri[0].Consumers)
+ require.Equal(t, int64(0), ri[0].Pending)
+ })
+
t.Run("XGROUP SETID with different kinds of commands", func(t
*testing.T) {
streamName := "test-stream"
groupName := "test-group"