This is an automated email from the ASF dual-hosted git repository.

maplefu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new eaf6b2e2 fix(stream): make consumer decrement pending number when 
message is acknowledged. (#2352)
eaf6b2e2 is described below

commit eaf6b2e2d292b980e6c65b8f9ec855ae2388ace9
Author: Edward Xu <[email protected]>
AuthorDate: Mon Jun 3 13:13:54 2024 +0800

    fix(stream): make consumer decrement pending number when message is 
acknowledged. (#2352)
    
    Co-authored-by: Twice <[email protected]>
---
 src/types/redis_stream.cc                    | 22 +++++++++++++++++
 tests/gocase/unit/type/stream/stream_test.go | 35 ++++++++++++++++++++++++++++
 2 files changed, 57 insertions(+)

diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 0ec71270..8b624a2e 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -339,13 +339,21 @@ rocksdb::Status Stream::DeletePelEntries(const Slice 
&stream_name, const std::st
   WriteBatchLogData log_data(kRedisStream);
   batch->PutLogData(log_data.Encode());
 
+  std::map<std::string, uint64_t> consumer_acknowledges;
   for (const auto &id : entry_ids) {
     std::string entry_key = internalPelKeyFromGroupAndEntryId(ns_key, 
metadata, group_name, id);
     std::string value;
     s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, 
&value);
+    if (!s.ok() && !s.IsNotFound()) {
+      return s;
+    }
     if (s.ok()) {
       *acknowledged += 1;
       batch->Delete(stream_cf_handle_, entry_key);
+
+      // increment ack for each related consumer
+      auto pel_entry = decodeStreamPelEntryValue(value);
+      consumer_acknowledges[pel_entry.consumer_name]++;
     }
   }
   if (*acknowledged > 0) {
@@ -353,6 +361,20 @@ rocksdb::Status Stream::DeletePelEntries(const Slice 
&stream_name, const std::st
     group_metadata.pending_number -= *acknowledged;
     std::string group_value = 
encodeStreamConsumerGroupMetadataValue(group_metadata);
     batch->Put(stream_cf_handle_, group_key, group_value);
+
+    for (const auto &[consumer_name, ack_count] : consumer_acknowledges) {
+      auto consumer_meta_key = internalKeyFromConsumerName(ns_key, metadata, 
group_name, consumer_name);
+      std::string consumer_meta_original;
+      s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, 
consumer_meta_key, &consumer_meta_original);
+      if (!s.ok() && !s.IsNotFound()) {
+        return s;
+      }
+      if (s.ok()) {
+        auto consumer_metadata = 
decodeStreamConsumerMetadataValue(consumer_meta_original);
+        consumer_metadata.pending_number -= ack_count;
+        batch->Put(stream_cf_handle_, consumer_meta_key, 
encodeStreamConsumerMetadataValue(consumer_metadata));
+      }
+    }
   }
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index d6a09a9e..7297bb66 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -1064,6 +1064,41 @@ func TestStreamOffset(t *testing.T) {
                require.Equal(t, consumer3, r1[0].Name)
        })
 
+       t.Run("XINFO after delete pending message and related consumer, for 
issue #2350", func(t *testing.T) {
+               streamName := "test-stream-2350"
+               groupName := "test-group-2350"
+               consumerName := "test-consumer-2350"
+               require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, 
groupName, "$").Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "*",
+                       Values: []string{"testing", "overflow"},
+               }).Err())
+               readRsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       Streams:  []string{streamName, ">"},
+                       Count:    1,
+                       NoAck:    false,
+               })
+               require.NoError(t, readRsp.Err())
+               require.Len(t, readRsp.Val(), 1)
+               streamRsp := readRsp.Val()[0]
+               require.Len(t, streamRsp.Messages, 1)
+               msgID := streamRsp.Messages[0]
+               require.NoError(t, rdb.XAck(ctx, streamName, groupName, 
msgID.ID).Err())
+               require.NoError(t, rdb.XGroupDelConsumer(ctx, streamName, 
groupName, consumerName).Err())
+               infoRsp := rdb.XInfoGroups(ctx, streamName)
+               require.NoError(t, infoRsp.Err())
+               infoGroups := infoRsp.Val()
+               require.Len(t, infoGroups, 1)
+               infoGroup := infoGroups[0]
+               require.Equal(t, groupName, infoGroup.Name)
+               require.Equal(t, int64(0), infoGroup.Consumers)
+               require.Equal(t, int64(0), infoGroup.Pending)
+               require.Equal(t, msgID.ID, infoGroup.LastDeliveredID)
+       })
+
        t.Run("XREAD After XGroupCreate and XGroupCreateConsumer, for issue 
#2109", func(t *testing.T) {
                streamName := "test-stream"
                group := "group"

Reply via email to