This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new d0f43549 Add support of the XACK command (#2169)
d0f43549 is described below

commit d0f435491862b3dd201c3f8ce7c194e3a33f1ffc
Author: Hauru <[email protected]>
AuthorDate: Fri Mar 15 08:45:46 2024 +0800

    Add support of the XACK command (#2169)
---
 src/commands/cmd_stream.cc                   | 38 +++++++++++++++++++-
 src/types/redis_stream.cc                    | 42 ++++++++++++++++++++++
 src/types/redis_stream.h                     |  2 ++
 tests/gocase/unit/type/stream/stream_test.go | 54 ++++++++++++++++++++++++++++
 4 files changed, 135 insertions(+), 1 deletion(-)

diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index a90caece..f8736168 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -31,6 +31,41 @@
 
 namespace redis {
 
+class CommandXAck : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    stream_name_ = args[1];
+    group_name_ = args[2];
+    StreamEntryID tmp_id;
+    for (size_t i = 3; i < args.size(); ++i) {
+      auto s = ParseStreamEntryID(args[i], &tmp_id);
+      if (!s.IsOK()) {
+        return {Status::RedisParseErr, s.Msg()};
+      }
+      entry_ids_.emplace_back(tmp_id);
+    }
+
+    return Status::OK();
+  }
+
+  Status Execute(Server *srv, Connection *conn, std::string *output) override {
+    redis::Stream stream_db(srv->storage, conn->GetNamespace());
+    uint64_t acknowledged = 0;
+    auto s = stream_db.DeletePelEntries(stream_name_, group_name_, entry_ids_, 
&acknowledged);
+    if (!s.ok()) {
+      return {Status::RedisExecErr, s.ToString()};
+    }
+    *output = redis::Integer(acknowledged);
+
+    return Status::OK();
+  }
+
+ private:
+  std::string stream_name_;
+  std::string group_name_;
+  std::vector<StreamEntryID> entry_ids_;
+};
+
 class CommandXAdd : public Commander {
  public:
   Status Parse(const std::vector<std::string> &args) override {
@@ -1496,7 +1531,8 @@ class CommandXSetId : public Commander {
   std::optional<uint64_t> entries_added_;
 };
 
-REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandXAdd>("xadd", -5, "write", 1, 1, 1),
+REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandXAck>("xack", -4, "write 
no-dbsize-check", 1, 1, 1),
+                        MakeCmdAttr<CommandXAdd>("xadd", -5, "write", 1, 1, 1),
                         MakeCmdAttr<CommandXDel>("xdel", -3, "write 
no-dbsize-check", 1, 1, 1),
                         MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2, 
2, 1),
                         MakeCmdAttr<CommandXLen>("xlen", -2, "read-only", 1, 
1, 1),
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index afc26a5f..aedcdfe8 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -314,6 +314,48 @@ StreamSubkeyType Stream::identifySubkeyType(const 
rocksdb::Slice &key) const {
   return StreamSubkeyType::StreamConsumerMetadata;
 }
 
+rocksdb::Status Stream::DeletePelEntries(const Slice &stream_name, const 
std::string &group_name,
+                                         const std::vector<StreamEntryID> 
&entry_ids, uint64_t *acknowledged) {
+  *acknowledged = 0;
+
+  std::string ns_key = AppendNamespacePrefix(stream_name);
+
+  LockGuard guard(storage_->GetLockManager(), ns_key);
+  StreamMetadata metadata(false);
+  rocksdb::Status s = GetMetadata(ns_key, &metadata);
+  if (!s.ok()) {
+    return s.IsNotFound() ? rocksdb::Status::OK() : s;
+  }
+
+  std::string group_key = internalKeyFromGroupName(ns_key, metadata, 
group_name);
+  std::string get_group_value;
+  s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key, 
&get_group_value);
+  if (!s.ok()) {
+    return s.IsNotFound() ? rocksdb::Status::OK() : s;
+  }
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisStream);
+  batch->PutLogData(log_data.Encode());
+
+  for (const auto &id : entry_ids) {
+    std::string entry_key = internalPelKeyFromGroupAndEntryId(ns_key, 
metadata, group_name, id);
+    std::string value;
+    s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, 
&value);
+    if (s.ok()) {
+      *acknowledged += 1;
+      batch->Delete(stream_cf_handle_, entry_key);
+    }
+  }
+  if (*acknowledged > 0) {
+    StreamConsumerGroupMetadata group_metadata = 
decodeStreamConsumerGroupMetadataValue(get_group_value);
+    group_metadata.pending_number -= *acknowledged;
+    std::string group_value = 
encodeStreamConsumerGroupMetadataValue(group_metadata);
+    batch->Put(stream_cf_handle_, group_key, group_value);
+  }
+  return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+}
+
 rocksdb::Status Stream::CreateGroup(const Slice &stream_name, const 
StreamXGroupCreateOptions &options,
                                     const std::string &group_name) {
   if (std::isdigit(group_name[0])) {
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index 8fa7bb70..f569c253 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -49,6 +49,8 @@ class Stream : public SubKeyScanner {
   rocksdb::Status GroupSetId(const Slice &stream_name, const std::string 
&group_name,
                              const StreamXGroupCreateOptions &options);
   rocksdb::Status DeleteEntries(const Slice &stream_name, const 
std::vector<StreamEntryID> &ids, uint64_t *deleted_cnt);
+  rocksdb::Status DeletePelEntries(const Slice &stream_name, const std::string 
&group_name,
+                                   const std::vector<StreamEntryID> 
&entry_ids, uint64_t *acknowledged);
   rocksdb::Status Len(const Slice &stream_name, const StreamLenOptions 
&options, uint64_t *size);
   rocksdb::Status GetStreamInfo(const Slice &stream_name, bool full, uint64_t 
count, StreamInfo *info);
   rocksdb::Status GetGroupInfo(const Slice &stream_name,
diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index a48ce651..ad6189db 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -1240,6 +1240,60 @@ func TestStreamOffset(t *testing.T) {
                require.NoError(t, erri)
                require.Equal(t, int64(1), ri[0].Consumers)
        })
+
+       t.Run("XACK with different kinds of commands", func(t *testing.T) {
+               streamName := "mystream"
+               groupName := "mygroup"
+               require.NoError(t, rdb.Del(ctx, streamName).Err())
+               r, err := rdb.XAck(ctx, streamName, groupName, "0-0").Result()
+               require.NoError(t, err)
+               require.Equal(t, int64(0), r)
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "1-0",
+                       Values: []string{"field1", "data1"},
+               }).Err())
+               require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, 
"0").Err())
+               consumerName := "myconsumer"
+               err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       Streams:  []string{streamName, ">"},
+                       Count:    1,
+                       NoAck:    false,
+               }).Err()
+               require.NoError(t, err)
+               r, err = rdb.XAck(ctx, streamName, groupName, "1-0").Result()
+               require.NoError(t, err)
+               require.Equal(t, int64(1), r)
+
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "2-0",
+                       Values: []string{"field1", "data1"},
+               }).Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "3-0",
+                       Values: []string{"field1", "data1"},
+               }).Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "4-0",
+                       Values: []string{"field1", "data1"},
+               }).Err())
+               err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       Streams:  []string{streamName, ">"},
+                       Count:    3,
+                       NoAck:    false,
+               }).Err()
+               require.NoError(t, err)
+               r, err = rdb.XAck(ctx, streamName, groupName, "2-0", "3-0", 
"4-0").Result()
+               require.NoError(t, err)
+               require.Equal(t, int64(3), r)
+       })
 }
 
 func parseStreamEntryID(id string) (ts int64, seqNum int64) {

Reply via email to