This is an automated email from the ASF dual-hosted git repository.
maplefu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new eaf6b2e2 fix(stream): make consumer decrement pending number when
message is acknowledged. (#2352)
eaf6b2e2 is described below
commit eaf6b2e2d292b980e6c65b8f9ec855ae2388ace9
Author: Edward Xu <[email protected]>
AuthorDate: Mon Jun 3 13:13:54 2024 +0800
fix(stream): make consumer decrement pending number when message is
acknowledged. (#2352)
Co-authored-by: Twice <[email protected]>
---
src/types/redis_stream.cc | 22 +++++++++++++++++
tests/gocase/unit/type/stream/stream_test.go | 35 ++++++++++++++++++++++++++++
2 files changed, 57 insertions(+)
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 0ec71270..8b624a2e 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -339,13 +339,21 @@ rocksdb::Status Stream::DeletePelEntries(const Slice
&stream_name, const std::st
WriteBatchLogData log_data(kRedisStream);
batch->PutLogData(log_data.Encode());
+ std::map<std::string, uint64_t> consumer_acknowledges;
for (const auto &id : entry_ids) {
std::string entry_key = internalPelKeyFromGroupAndEntryId(ns_key,
metadata, group_name, id);
std::string value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key,
&value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
if (s.ok()) {
*acknowledged += 1;
batch->Delete(stream_cf_handle_, entry_key);
+
+ // increment ack for each related consumer
+ auto pel_entry = decodeStreamPelEntryValue(value);
+ consumer_acknowledges[pel_entry.consumer_name]++;
}
}
if (*acknowledged > 0) {
@@ -353,6 +361,20 @@ rocksdb::Status Stream::DeletePelEntries(const Slice
&stream_name, const std::st
group_metadata.pending_number -= *acknowledged;
std::string group_value =
encodeStreamConsumerGroupMetadataValue(group_metadata);
batch->Put(stream_cf_handle_, group_key, group_value);
+
+ for (const auto &[consumer_name, ack_count] : consumer_acknowledges) {
+ auto consumer_meta_key = internalKeyFromConsumerName(ns_key, metadata,
group_name, consumer_name);
+ std::string consumer_meta_original;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_,
consumer_meta_key, &consumer_meta_original);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.ok()) {
+ auto consumer_metadata =
decodeStreamConsumerMetadataValue(consumer_meta_original);
+ consumer_metadata.pending_number -= ack_count;
+ batch->Put(stream_cf_handle_, consumer_meta_key,
encodeStreamConsumerMetadataValue(consumer_metadata));
+ }
+ }
}
return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index d6a09a9e..7297bb66 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -1064,6 +1064,41 @@ func TestStreamOffset(t *testing.T) {
require.Equal(t, consumer3, r1[0].Name)
})
+ t.Run("XINFO after delete pending message and related consumer, for
issue #2350", func(t *testing.T) {
+ streamName := "test-stream-2350"
+ groupName := "test-group-2350"
+ consumerName := "test-consumer-2350"
+ require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName,
groupName, "$").Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "*",
+ Values: []string{"testing", "overflow"},
+ }).Err())
+ readRsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName,
+ Streams: []string{streamName, ">"},
+ Count: 1,
+ NoAck: false,
+ })
+ require.NoError(t, readRsp.Err())
+ require.Len(t, readRsp.Val(), 1)
+ streamRsp := readRsp.Val()[0]
+ require.Len(t, streamRsp.Messages, 1)
+ msgID := streamRsp.Messages[0]
+ require.NoError(t, rdb.XAck(ctx, streamName, groupName,
msgID.ID).Err())
+ require.NoError(t, rdb.XGroupDelConsumer(ctx, streamName,
groupName, consumerName).Err())
+ infoRsp := rdb.XInfoGroups(ctx, streamName)
+ require.NoError(t, infoRsp.Err())
+ infoGroups := infoRsp.Val()
+ require.Len(t, infoGroups, 1)
+ infoGroup := infoGroups[0]
+ require.Equal(t, groupName, infoGroup.Name)
+ require.Equal(t, int64(0), infoGroup.Consumers)
+ require.Equal(t, int64(0), infoGroup.Pending)
+ require.Equal(t, msgID.ID, infoGroup.LastDeliveredID)
+ })
+
t.Run("XREAD After XGroupCreate and XGroupCreateConsumer, for issue
#2109", func(t *testing.T) {
streamName := "test-stream"
group := "group"