This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 2c07b160 Fix XREADGROUP command didn't fetch the latest metadata after 
creating a consumer (#2153)
2c07b160 is described below

commit 2c07b1607d7a5e60eaa7059773f522d4d7865e30
Author: Hauru <[email protected]>
AuthorDate: Mon Mar 11 20:18:02 2024 +0800

    Fix XREADGROUP command didn't fetch the latest metadata after creating a 
consumer (#2153)
---
 src/types/redis_stream.cc                    |  1 +
 tests/gocase/unit/type/stream/stream_test.go | 24 ++++++++++++++++++++++++
 2 files changed, 25 insertions(+)

diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 476c5ba4..afc26a5f 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -1097,6 +1097,7 @@ rocksdb::Status Stream::RangeWithPending(const Slice 
&stream_name, StreamRangeOp
     if (!s.ok()) {
       return s;
     }
+    s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key, 
&get_group_value);
   }
 
   auto batch = storage_->GetWriteBatchBase();
diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index 66ae0705..a48ce651 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -1216,6 +1216,30 @@ func TestStreamOffset(t *testing.T) {
                        Messages: []redis.XMessage{{ID: "5-0", Values: 
map[string]interface{}(nil)}},
                }}, r)
        })
+
+       t.Run("Check xreadgroup fetches the newest data after create consumer 
in the command", func(t *testing.T) {
+               streamName := "mystream"
+               groupName := "mygroup"
+               require.NoError(t, rdb.Del(ctx, streamName).Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "1-0",
+                       Values: []string{"field1", "data1"},
+               }).Err())
+               require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, 
"0").Err())
+               consumerName := "myconsumer"
+               err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       Streams:  []string{streamName, ">"},
+                       Count:    1,
+                       NoAck:    false,
+               }).Err()
+               require.NoError(t, err)
+               ri, erri := rdb.XInfoGroups(ctx, streamName).Result()
+               require.NoError(t, erri)
+               require.Equal(t, int64(1), ri[0].Consumers)
+       })
 }
 
 func parseStreamEntryID(id string) (ts int64, seqNum int64) {

Reply via email to