This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 31d62fd9 Add support of the XGROUP SETID command (#1833)
31d62fd9 is described below
commit 31d62fd9a57cffe181b741ba57e9d80a0c41d392
Author: Hauru <[email protected]>
AuthorDate: Thu Oct 19 21:24:59 2023 +0800
Add support of the XGROUP SETID command (#1833)
---
src/commands/cmd_stream.cc | 33 +++++++++++++++++++++
src/types/redis_stream.cc | 43 ++++++++++++++++++++++++++++
src/types/redis_stream.h | 2 ++
src/types/redis_stream_base.h | 2 +-
tests/gocase/unit/type/stream/stream_test.go | 21 ++++++++++++++
5 files changed, 100 insertions(+), 1 deletion(-)
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index a950a30d..57b21513 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -258,6 +258,30 @@ class CommandXGroup : public Commander {
return Status::OK();
}
+ if (subcommand_ == "setid") {
+ if (args.size() != 5 && args.size() != 7) {
+ return {Status::RedisParseErr, errWrongNumOfArguments};
+ }
+
+ xgroup_create_options_.last_id = GET_OR_RET(parser.TakeStr());
+ if (args.size() == 7) {
+ if (parser.EatEqICase("entriesread")) {
+ auto parse_result = parser.TakeInt<int64_t>();
+ if (!parse_result.IsOK()) {
+ return {Status::RedisParseErr, errValueNotInteger};
+ }
+ if (parse_result.GetValue() < 0 && parse_result.GetValue() != -1) {
+ return {Status::RedisParseErr, "value for ENTRIESREAD must be
positive or -1"};
+ }
+ xgroup_create_options_.entries_read = parse_result.GetValue();
+ } else {
+ return parser.InvalidSyntax();
+ }
+ }
+
+ return Status::OK();
+ }
+
return {Status::RedisParseErr, "unknown subcommand"};
}
@@ -297,6 +321,15 @@ class CommandXGroup : public Commander {
*output = redis::Integer(created_number);
}
+ if (subcommand_ == "setid") {
+ auto s = stream_db.GroupSetId(stream_name_, group_name_,
xgroup_create_options_);
+ if (!s.ok()) {
+ return {Status::RedisExecErr, s.ToString()};
+ }
+
+ *output = redis::SimpleString("OK");
+ }
+
return Status::OK();
}
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index daa5fd41..ddf63c91 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -387,6 +387,49 @@ rocksdb::Status Stream::CreateConsumer(const Slice
&stream_name, const std::stri
return s;
}
+rocksdb::Status Stream::GroupSetId(const Slice &stream_name, const std::string
&group_name,
+ const StreamXGroupCreateOptions &options) {
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+ LockGuard guard(storage_->GetLockManager(), ns_key);
+ StreamMetadata metadata;
+ rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ return
rocksdb::Status::InvalidArgument(errXGroupSubcommandRequiresKeyExist);
+ }
+
+ std::string entry_key = internalKeyFromGroupName(ns_key, metadata,
group_name);
+ std::string get_entry_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key,
&get_entry_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group "
+ group_name + " for key name " +
+ stream_name.ToString());
+ }
+
+ StreamConsumerGroupMetadata consumer_group_metadata =
decodeStreamConsumerGroupMetadataValue(get_entry_value);
+ if (options.last_id == "$") {
+ consumer_group_metadata.last_delivered_id = metadata.last_entry_id;
+ } else {
+ auto s = ParseStreamEntryID(options.last_id,
&consumer_group_metadata.last_delivered_id);
+ if (!s.IsOK()) {
+ return rocksdb::Status::InvalidArgument(s.Msg());
+ }
+ }
+ consumer_group_metadata.entries_read = options.entries_read;
+ std::string entry_value =
encodeStreamConsumerGroupMetadataValue(consumer_group_metadata);
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisStream);
+ batch->PutLogData(log_data.Encode());
+ batch->Put(stream_cf_handle_, entry_key, entry_value);
+ return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+}
+
rocksdb::Status Stream::DeleteEntries(const Slice &stream_name, const
std::vector<StreamEntryID> &ids,
uint64_t *deleted_cnt) {
*deleted_cnt = 0;
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index 75a51ece..76bc17a6 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -44,6 +44,8 @@ class Stream : public SubKeyScanner {
rocksdb::Status DestroyGroup(const Slice &stream_name, const std::string
&group_name, uint64_t *delete_cnt);
rocksdb::Status CreateConsumer(const Slice &stream_name, const std::string
&group_name,
const std::string &consumer_name, int
*created_number);
+ rocksdb::Status GroupSetId(const Slice &stream_name, const std::string
&group_name,
+ const StreamXGroupCreateOptions &options);
rocksdb::Status DeleteEntries(const Slice &stream_name, const
std::vector<StreamEntryID> &ids, uint64_t *deleted_cnt);
rocksdb::Status Len(const Slice &stream_name, const StreamLenOptions
&options, uint64_t *size);
rocksdb::Status GetStreamInfo(const Slice &stream_name, bool full, uint64_t
count, StreamInfo *info);
diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h
index f5d4496d..9899f0e8 100644
--- a/src/types/redis_stream_base.h
+++ b/src/types/redis_stream_base.h
@@ -165,7 +165,7 @@ struct StreamConsumerGroupMetadata {
uint64_t consumer_number = 0;
uint64_t pending_number = 0;
StreamEntryID last_delivered_id;
- int64_t entries_read = 0;
+ int64_t entries_read = -1;
uint64_t lag = 0;
};
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index 8b2b67d3..05822387 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -908,6 +908,27 @@ func TestStreamOffset(t *testing.T) {
r = rdb.XGroupCreateConsumer(ctx, streamName, groupName,
consumerName).Val()
require.Equal(t, int64(0), r)
})
+
+ t.Run("XGROUP SETID with different kinds of commands", func(t
*testing.T) {
+ streamName := "test-stream"
+ groupName := "test-group"
+ require.NoError(t, rdb.Del(ctx, streamName).Err())
+ //No such stream
+ require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName,
"$").Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "1-0",
+ Values: []string{"data", "a"},
+ }).Err())
+ //No such group
+ require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName,
"$").Err())
+ require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"$").Err())
+
+ require.NoError(t, rdb.XGroupSetID(ctx, streamName, groupName,
"0-0").Err())
+ require.Error(t, rdb.Do(ctx, "xgroup", "setid", streamName,
groupName, "$", "entries", "100").Err())
+ require.Error(t, rdb.Do(ctx, "xgroup", "setid", streamName,
groupName, "$", "entriesread", "-100").Err())
+ require.NoError(t, rdb.Do(ctx, "xgroup", "setid", streamName,
groupName, "$", "entriesread", "100").Err())
+ })
}
func parseStreamEntryID(id string) (ts int64, seqNum int64) {