This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new e38021ca Add support of the XGROUP CREATECONSUMER command (#1799)
e38021ca is described below
commit e38021ca509d21624957f217aae516360ff77adb
Author: Hauru <[email protected]>
AuthorDate: Thu Oct 12 11:44:35 2023 +0800
Add support of the XGROUP CREATECONSUMER command (#1799)
---
src/commands/cmd_stream.cc | 20 ++++++++
src/types/redis_stream.cc | 74 ++++++++++++++++++++++++++++
src/types/redis_stream.h | 5 ++
src/types/redis_stream_base.h | 6 +++
tests/gocase/unit/type/stream/stream_test.go | 22 +++++++++
5 files changed, 127 insertions(+)
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 8e19b9d3..a950a30d 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -249,6 +249,15 @@ class CommandXGroup : public Commander {
return Status::OK();
}
+ if (subcommand_ == "createconsumer") {
+ if (args.size() != 5) {
+ return {Status::RedisParseErr, errWrongNumOfArguments};
+ }
+ consumer_name_ = GET_OR_RET(parser.TakeStr());
+
+ return Status::OK();
+ }
+
return {Status::RedisParseErr, "unknown subcommand"};
}
@@ -278,6 +287,16 @@ class CommandXGroup : public Commander {
}
}
+ if (subcommand_ == "createconsumer") {
+ int created_number = 0;
+ auto s = stream_db.CreateConsumer(stream_name_, group_name_,
consumer_name_, &created_number);
+ if (!s.ok()) {
+ return {Status::RedisExecErr, s.ToString()};
+ }
+
+ *output = redis::Integer(created_number);
+ }
+
return Status::OK();
}
@@ -285,6 +304,7 @@ class CommandXGroup : public Commander {
std::string subcommand_;
std::string stream_name_;
std::string group_name_;
+ std::string consumer_name_;
StreamXGroupCreateOptions xgroup_create_options_;
};
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 7a7ffb16..daa5fd41 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -27,6 +27,7 @@
#include <vector>
#include "db_util.h"
+#include "time_util.h"
namespace redis {
@@ -211,6 +212,26 @@ StreamConsumerGroupMetadata
Stream::decodeStreamConsumerGroupMetadataValue(const
return consumer_group_metadata;
}
+std::string Stream::internalKeyFromConsumerName(const std::string &ns_key,
const StreamMetadata &metadata,
+ const std::string &group_name,
const std::string &consumer_name) const {
+ std::string sub_key;
+ PutFixed64(&sub_key, group_name.size());
+ sub_key += group_name;
+ PutFixed64(&sub_key, consumer_name.size());
+ sub_key += consumer_name;
+ sub_key += consumerGroupMetadataDelimiter;
+ std::string entry_key = InternalKey(ns_key, sub_key, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+ return entry_key;
+}
+
+std::string Stream::encodeStreamConsumerMetadataValue(const
StreamConsumerMetadata &consumer_metadata) {
+ std::string dst;
+ PutFixed64(&dst, consumer_metadata.pending_number);
+ PutFixed64(&dst, consumer_metadata.last_idle);
+ PutFixed64(&dst, consumer_metadata.last_active);
+ return dst;
+}
+
rocksdb::Status Stream::CreateGroup(const Slice &stream_name, const
StreamXGroupCreateOptions &options,
const std::string &group_name) {
if (std::isdigit(group_name[0])) {
@@ -313,6 +334,59 @@ rocksdb::Status Stream::DestroyGroup(const Slice
&stream_name, const std::string
return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
+rocksdb::Status Stream::CreateConsumer(const Slice &stream_name, const
std::string &group_name,
+ const std::string &consumer_name, int
*created_number) {
+ if (std::isdigit(consumer_name[0])) {
+ return rocksdb::Status::InvalidArgument("consumer name cannot start with
number");
+ }
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+ LockGuard guard(storage_->GetLockManager(), ns_key);
+ StreamMetadata metadata;
+ rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ return
rocksdb::Status::InvalidArgument(errXGroupSubcommandRequiresKeyExist);
+ }
+
+ std::string entry_key = internalKeyFromGroupName(ns_key, metadata,
group_name);
+ std::string get_entry_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key,
&get_entry_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group "
+ group_name + " for key name " +
+ stream_name.ToString());
+ }
+
+ StreamConsumerMetadata consumer_metadata;
+ auto now = util::GetTimeStampMS();
+ consumer_metadata.last_idle = now;
+ consumer_metadata.last_active = now;
+ std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata,
group_name, consumer_name);
+ std::string consumer_value =
encodeStreamConsumerMetadataValue(consumer_metadata);
+ std::string get_consumer_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key,
&get_consumer_value);
+ if (!s.IsNotFound()) {
+ return s;
+ }
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisStream);
+ batch->PutLogData(log_data.Encode());
+
+ batch->Put(stream_cf_handle_, consumer_key, consumer_value);
+ StreamConsumerGroupMetadata consumer_group_metadata =
decodeStreamConsumerGroupMetadataValue(get_entry_value);
+ consumer_group_metadata.consumer_number += 1;
+ std::string consumer_group_metadata_bytes =
encodeStreamConsumerGroupMetadataValue(consumer_group_metadata);
+ batch->Put(stream_cf_handle_, entry_key, consumer_group_metadata_bytes);
+ s = storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
+ if (s.ok()) *created_number = 1;
+ return s;
+}
+
rocksdb::Status Stream::DeleteEntries(const Slice &stream_name, const
std::vector<StreamEntryID> &ids,
uint64_t *deleted_cnt) {
*deleted_cnt = 0;
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index 81cdc111..75a51ece 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -42,6 +42,8 @@ class Stream : public SubKeyScanner {
rocksdb::Status CreateGroup(const Slice &stream_name, const
StreamXGroupCreateOptions &options,
const std::string &group_name);
rocksdb::Status DestroyGroup(const Slice &stream_name, const std::string
&group_name, uint64_t *delete_cnt);
+ rocksdb::Status CreateConsumer(const Slice &stream_name, const std::string
&group_name,
+ const std::string &consumer_name, int
*created_number);
rocksdb::Status DeleteEntries(const Slice &stream_name, const
std::vector<StreamEntryID> &ids, uint64_t *deleted_cnt);
rocksdb::Status Len(const Slice &stream_name, const StreamLenOptions
&options, uint64_t *size);
rocksdb::Status GetStreamInfo(const Slice &stream_name, bool full, uint64_t
count, StreamInfo *info);
@@ -69,6 +71,9 @@ class Stream : public SubKeyScanner {
std::string groupNameFromInternalKey(rocksdb::Slice key) const;
static std::string encodeStreamConsumerGroupMetadataValue(const
StreamConsumerGroupMetadata &consumer_group_metadata);
static StreamConsumerGroupMetadata
decodeStreamConsumerGroupMetadataValue(const std::string &value);
+ std::string internalKeyFromConsumerName(const std::string &ns_key, const
StreamMetadata &metadata,
+ const std::string &group_name, const
std::string &consumer_name) const;
+ static std::string encodeStreamConsumerMetadataValue(const
StreamConsumerMetadata &consumer_metadata);
};
} // namespace redis
diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h
index 7b0d7b61..f5d4496d 100644
--- a/src/types/redis_stream_base.h
+++ b/src/types/redis_stream_base.h
@@ -169,6 +169,12 @@ struct StreamConsumerGroupMetadata {
uint64_t lag = 0;
};
+struct StreamConsumerMetadata {
+ uint64_t pending_number = 0;
+ uint64_t last_idle;
+ uint64_t last_active;
+};
+
struct StreamInfo {
uint64_t size;
uint64_t entries_added;
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index a91d791c..8b2b67d3 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -886,6 +886,28 @@ func TestStreamOffset(t *testing.T) {
require.NoError(t, err)
require.Equal(t, int64(0), result)
})
+
+ t.Run("XGROUP CREATECONSUMER with different kinds of commands", func(t
*testing.T) {
+ streamName := "test-stream"
+ groupName := "test-group"
+ consumerName := "test-consumer"
+ require.NoError(t, rdb.Del(ctx, streamName).Err())
+ //No such stream
+ require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName,
groupName, consumerName).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "1-0",
+ Values: []string{"data", "a"},
+ }).Err())
+ //no such group
+ require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName,
groupName, consumerName).Err())
+ require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"$").Err())
+
+ r := rdb.XGroupCreateConsumer(ctx, streamName, groupName,
consumerName).Val()
+ require.Equal(t, int64(1), r)
+ r = rdb.XGroupCreateConsumer(ctx, streamName, groupName,
consumerName).Val()
+ require.Equal(t, int64(0), r)
+ })
}
func parseStreamEntryID(id string) (ts int64, seqNum int64) {