This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 7d61e940 Add support of the command XGROUP DELCONSUMER (#2141)
7d61e940 is described below

commit 7d61e94007098f759a630ff9e04c5b44a3f69c3c
Author: Hauru <[email protected]>
AuthorDate: Sat Mar 9 21:41:21 2024 +0800

    Add support of the command XGROUP DELCONSUMER (#2141)
---
 src/commands/cmd_stream.cc                   | 12 ++++-
 src/types/redis_stream.cc                    | 71 +++++++++++++++++++++++++
 src/types/redis_stream.h                     |  2 +
 tests/gocase/unit/type/stream/stream_test.go | 78 ++++++++++++++++++++++++++++
 4 files changed, 162 insertions(+), 1 deletion(-)

diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 04ea2942..a90caece 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -250,7 +250,7 @@ class CommandXGroup : public Commander {
       return Status::OK();
     }
 
-    if (subcommand_ == "createconsumer") {
+    if (subcommand_ == "createconsumer" || subcommand_ == "delconsumer") {
       if (args.size() != 5) {
         return {Status::RedisParseErr, errWrongNumOfArguments};
       }
@@ -322,6 +322,16 @@ class CommandXGroup : public Commander {
       *output = redis::Integer(created_number);
     }
 
+    if (subcommand_ == "delconsumer") {
+      uint64_t deleted_pel = 0;
+      auto s = stream_db.DestroyConsumer(stream_name_, group_name_, 
consumer_name_, deleted_pel);
+      if (!s.ok()) {
+        return {Status::RedisExecErr, s.ToString()};
+      }
+
+      *output = redis::Integer(deleted_pel);
+    }
+
     if (subcommand_ == "setid") {
       auto s = stream_db.GroupSetId(stream_name_, group_name_, 
xgroup_create_options_);
       if (!s.ok()) {
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 0bebd28b..476c5ba4 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -475,6 +475,77 @@ rocksdb::Status Stream::CreateConsumer(const Slice 
&stream_name, const std::stri
   return createConsumerWithoutLock(stream_name, group_name, consumer_name, 
created_number);
 }
 
+rocksdb::Status Stream::DestroyConsumer(const Slice &stream_name, const 
std::string &group_name,
+                                        const std::string &consumer_name, 
uint64_t &deleted_pel) {
+  std::string ns_key = AppendNamespacePrefix(stream_name);
+  LockGuard guard(storage_->GetLockManager(), ns_key);
+  StreamMetadata metadata;
+  rocksdb::Status s = GetMetadata(ns_key, &metadata);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+  if (s.IsNotFound()) {
+    return 
rocksdb::Status::InvalidArgument(errXGroupSubcommandRequiresKeyExist);
+  }
+
+  std::string group_key = internalKeyFromGroupName(ns_key, metadata, 
group_name);
+  std::string get_group_value;
+  s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key, 
&get_group_value);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+  if (s.IsNotFound()) {
+    return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group " 
+ group_name + " for key name " +
+                                            stream_name.ToString());
+  }
+
+  std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, 
group_name, consumer_name);
+  std::string get_consumer_value;
+  s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, 
&get_consumer_value);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+  if (s.IsNotFound()) {
+    return rocksdb::Status::OK();
+  }
+
+  StreamConsumerMetadata consumer_metadata = 
decodeStreamConsumerMetadataValue(get_consumer_value);
+  deleted_pel = consumer_metadata.pending_number;
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisStream);
+  batch->PutLogData(log_data.Encode());
+
+  std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, 
group_name, StreamEntryID::Minimum());
+  std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, 
group_name, StreamEntryID::Maximum());
+
+  rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
+  LatestSnapShot ss(storage_);
+  read_options.snapshot = ss.GetSnapShot();
+  rocksdb::Slice upper_bound(end_key);
+  read_options.iterate_upper_bound = &upper_bound;
+  rocksdb::Slice lower_bound(prefix_key);
+  read_options.iterate_lower_bound = &lower_bound;
+
+  auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
+  for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+    if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) {
+      std::string tmp_group_name;
+      FMT_MAYBE_UNUSED StreamEntryID entry_id = 
groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
+      if (tmp_group_name != group_name) continue;
+      StreamPelEntry pel_entry = 
decodeStreamPelEntryValue(iter->value().ToString());
+      if (pel_entry.consumer_name == consumer_name) {
+        batch->Delete(stream_cf_handle_, iter->key());
+      }
+    }
+  }
+  batch->Delete(stream_cf_handle_, consumer_key);
+  StreamConsumerGroupMetadata group_metadata = 
decodeStreamConsumerGroupMetadataValue(get_group_value);
+  group_metadata.consumer_number -= 1;
+  group_metadata.pending_number -= deleted_pel;
+  batch->Put(stream_cf_handle_, group_key, 
encodeStreamConsumerGroupMetadataValue(group_metadata));
+  return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+}
+
 rocksdb::Status Stream::GroupSetId(const Slice &stream_name, const std::string 
&group_name,
                                    const StreamXGroupCreateOptions &options) {
   std::string ns_key = AppendNamespacePrefix(stream_name);
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index 4381e4ef..8fa7bb70 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -44,6 +44,8 @@ class Stream : public SubKeyScanner {
   rocksdb::Status DestroyGroup(const Slice &stream_name, const std::string 
&group_name, uint64_t *delete_cnt);
   rocksdb::Status CreateConsumer(const Slice &stream_name, const std::string 
&group_name,
                                  const std::string &consumer_name, int 
*created_number);
+  rocksdb::Status DestroyConsumer(const Slice &stream_name, const std::string 
&group_name,
+                                  const std::string &consumer_name, uint64_t 
&deleted_pel);
   rocksdb::Status GroupSetId(const Slice &stream_name, const std::string 
&group_name,
                              const StreamXGroupCreateOptions &options);
   rocksdb::Status DeleteEntries(const Slice &stream_name, const 
std::vector<StreamEntryID> &ids, uint64_t *deleted_cnt);
diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index d74ff2b0..66ae0705 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -919,6 +919,84 @@ func TestStreamOffset(t *testing.T) {
                require.Equal(t, int64(0), r)
        })
 
+       t.Run("XGROUP DELCONSUMER with different kinds of commands", func(t 
*testing.T) {
+               streamName := "test-stream"
+               groupName := "test-group"
+               consumerName := "test-consumer"
+               require.NoError(t, rdb.Del(ctx, streamName).Err())
+               //No such stream
+               require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, 
groupName, consumerName).Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "1-0",
+                       Values: []string{"data", "a"},
+               }).Err())
+               //no such group
+               require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, 
groupName, consumerName).Err())
+               require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, 
"$").Err())
+               require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, 
groupName, consumerName).Err())
+
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "*",
+                       Values: []string{"data1", "a1"},
+               }).Err())
+               require.NoError(t, rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       Streams:  []string{streamName, ">"},
+                       Count:    1,
+                       NoAck:    false,
+               }).Err())
+               ri, erri := rdb.XInfoGroups(ctx, streamName).Result()
+               require.NoError(t, erri)
+               require.Equal(t, int64(1), ri[0].Consumers)
+               require.Equal(t, int64(1), ri[0].Pending)
+
+               r, err := rdb.XGroupDelConsumer(ctx, streamName, groupName, 
consumerName).Result()
+               require.NoError(t, err)
+               require.Equal(t, int64(1), r)
+               ri, erri = rdb.XInfoGroups(ctx, streamName).Result()
+               require.NoError(t, erri)
+               require.Equal(t, int64(0), ri[0].Consumers)
+               require.Equal(t, int64(0), ri[0].Pending)
+
+               require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, 
groupName, consumerName).Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "*",
+                       Values: []string{"data2", "a2"},
+               }).Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "*",
+                       Values: []string{"data3", "a3"},
+               }).Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "*",
+                       Values: []string{"data4", "a4"},
+               }).Err())
+               require.NoError(t, rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       Streams:  []string{streamName, ">"},
+                       Count:    3,
+                       NoAck:    false,
+               }).Err())
+               ri, erri = rdb.XInfoGroups(ctx, streamName).Result()
+               require.NoError(t, erri)
+               require.Equal(t, int64(1), ri[0].Consumers)
+               require.Equal(t, int64(3), ri[0].Pending)
+               r, err = rdb.XGroupDelConsumer(ctx, streamName, groupName, 
consumerName).Result()
+               require.NoError(t, err)
+               require.Equal(t, int64(3), r)
+               ri, erri = rdb.XInfoGroups(ctx, streamName).Result()
+               require.NoError(t, erri)
+               require.Equal(t, int64(0), ri[0].Consumers)
+               require.Equal(t, int64(0), ri[0].Pending)
+       })
+
        t.Run("XGROUP SETID with different kinds of commands", func(t 
*testing.T) {
                streamName := "test-stream"
                groupName := "test-group"

Reply via email to