This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 31d62fd9 Add support of the XGROUP SETID command (#1833)
31d62fd9 is described below

commit 31d62fd9a57cffe181b741ba57e9d80a0c41d392
Author: Hauru <[email protected]>
AuthorDate: Thu Oct 19 21:24:59 2023 +0800

    Add support of the XGROUP SETID command (#1833)
---
 src/commands/cmd_stream.cc                   | 33 +++++++++++++++++++++
 src/types/redis_stream.cc                    | 43 ++++++++++++++++++++++++++++
 src/types/redis_stream.h                     |  2 ++
 src/types/redis_stream_base.h                |  2 +-
 tests/gocase/unit/type/stream/stream_test.go | 21 ++++++++++++++
 5 files changed, 100 insertions(+), 1 deletion(-)

diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index a950a30d..57b21513 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -258,6 +258,30 @@ class CommandXGroup : public Commander {
       return Status::OK();
     }
 
+    if (subcommand_ == "setid") {
+      if (args.size() != 5 && args.size() != 7) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      xgroup_create_options_.last_id = GET_OR_RET(parser.TakeStr());
+      if (args.size() == 7) {
+        if (parser.EatEqICase("entriesread")) {
+          auto parse_result = parser.TakeInt<int64_t>();
+          if (!parse_result.IsOK()) {
+            return {Status::RedisParseErr, errValueNotInteger};
+          }
+          if (parse_result.GetValue() < 0 && parse_result.GetValue() != -1) {
+            return {Status::RedisParseErr, "value for ENTRIESREAD must be 
positive or -1"};
+          }
+          xgroup_create_options_.entries_read = parse_result.GetValue();
+        } else {
+          return parser.InvalidSyntax();
+        }
+      }
+
+      return Status::OK();
+    }
+
     return {Status::RedisParseErr, "unknown subcommand"};
   }
 
@@ -297,6 +321,15 @@ class CommandXGroup : public Commander {
       *output = redis::Integer(created_number);
     }
 
+    if (subcommand_ == "setid") {
+      auto s = stream_db.GroupSetId(stream_name_, group_name_, 
xgroup_create_options_);
+      if (!s.ok()) {
+        return {Status::RedisExecErr, s.ToString()};
+      }
+
+      *output = redis::SimpleString("OK");
+    }
+
     return Status::OK();
   }
 
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index daa5fd41..ddf63c91 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -387,6 +387,49 @@ rocksdb::Status Stream::CreateConsumer(const Slice 
&stream_name, const std::stri
   return s;
 }
 
+rocksdb::Status Stream::GroupSetId(const Slice &stream_name, const std::string 
&group_name,
+                                   const StreamXGroupCreateOptions &options) {
+  std::string ns_key = AppendNamespacePrefix(stream_name);
+  LockGuard guard(storage_->GetLockManager(), ns_key);
+  StreamMetadata metadata;
+  rocksdb::Status s = GetMetadata(ns_key, &metadata);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+  if (s.IsNotFound()) {
+    return 
rocksdb::Status::InvalidArgument(errXGroupSubcommandRequiresKeyExist);
+  }
+
+  std::string entry_key = internalKeyFromGroupName(ns_key, metadata, 
group_name);
+  std::string get_entry_value;
+  s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, 
&get_entry_value);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+  if (s.IsNotFound()) {
+    return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group " 
+ group_name + " for key name " +
+                                            stream_name.ToString());
+  }
+
+  StreamConsumerGroupMetadata consumer_group_metadata = 
decodeStreamConsumerGroupMetadataValue(get_entry_value);
+  if (options.last_id == "$") {
+    consumer_group_metadata.last_delivered_id = metadata.last_entry_id;
+  } else {
+    auto s = ParseStreamEntryID(options.last_id, 
&consumer_group_metadata.last_delivered_id);
+    if (!s.IsOK()) {
+      return rocksdb::Status::InvalidArgument(s.Msg());
+    }
+  }
+  consumer_group_metadata.entries_read = options.entries_read;
+  std::string entry_value = 
encodeStreamConsumerGroupMetadataValue(consumer_group_metadata);
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisStream);
+  batch->PutLogData(log_data.Encode());
+  batch->Put(stream_cf_handle_, entry_key, entry_value);
+  return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+}
+
 rocksdb::Status Stream::DeleteEntries(const Slice &stream_name, const 
std::vector<StreamEntryID> &ids,
                                       uint64_t *deleted_cnt) {
   *deleted_cnt = 0;
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index 75a51ece..76bc17a6 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -44,6 +44,8 @@ class Stream : public SubKeyScanner {
   rocksdb::Status DestroyGroup(const Slice &stream_name, const std::string 
&group_name, uint64_t *delete_cnt);
   rocksdb::Status CreateConsumer(const Slice &stream_name, const std::string 
&group_name,
                                  const std::string &consumer_name, int 
*created_number);
+  rocksdb::Status GroupSetId(const Slice &stream_name, const std::string 
&group_name,
+                             const StreamXGroupCreateOptions &options);
   rocksdb::Status DeleteEntries(const Slice &stream_name, const 
std::vector<StreamEntryID> &ids, uint64_t *deleted_cnt);
   rocksdb::Status Len(const Slice &stream_name, const StreamLenOptions 
&options, uint64_t *size);
   rocksdb::Status GetStreamInfo(const Slice &stream_name, bool full, uint64_t 
count, StreamInfo *info);
diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h
index f5d4496d..9899f0e8 100644
--- a/src/types/redis_stream_base.h
+++ b/src/types/redis_stream_base.h
@@ -165,7 +165,7 @@ struct StreamConsumerGroupMetadata {
   uint64_t consumer_number = 0;
   uint64_t pending_number = 0;
   StreamEntryID last_delivered_id;
-  int64_t entries_read = 0;
+  int64_t entries_read = -1;
   uint64_t lag = 0;
 };
 
diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index 8b2b67d3..05822387 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -908,6 +908,27 @@ func TestStreamOffset(t *testing.T) {
                r = rdb.XGroupCreateConsumer(ctx, streamName, groupName, 
consumerName).Val()
                require.Equal(t, int64(0), r)
        })
+
+       t.Run("XGROUP SETID with different kinds of commands", func(t 
*testing.T) {
+               streamName := "test-stream"
+               groupName := "test-group"
+               require.NoError(t, rdb.Del(ctx, streamName).Err())
+               //No such stream
+               require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName, 
"$").Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "1-0",
+                       Values: []string{"data", "a"},
+               }).Err())
+               //No such group
+               require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName, 
"$").Err())
+               require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, 
"$").Err())
+
+               require.NoError(t, rdb.XGroupSetID(ctx, streamName, groupName, 
"0-0").Err())
+               require.Error(t, rdb.Do(ctx, "xgroup", "setid", streamName, 
groupName, "$", "entries", "100").Err())
+               require.Error(t, rdb.Do(ctx, "xgroup", "setid", streamName, 
groupName, "$", "entriesread", "-100").Err())
+               require.NoError(t, rdb.Do(ctx, "xgroup", "setid", streamName, 
groupName, "$", "entriesread", "100").Err())
+       })
 }
 
 func parseStreamEntryID(id string) (ts int64, seqNum int64) {

Reply via email to