This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 2c07b160 Fix XREADGROUP command didn't fetch the latest metadata after
creating a consumer (#2153)
2c07b160 is described below
commit 2c07b1607d7a5e60eaa7059773f522d4d7865e30
Author: Hauru <[email protected]>
AuthorDate: Mon Mar 11 20:18:02 2024 +0800
Fix XREADGROUP command didn't fetch the latest metadata after creating a
consumer (#2153)
---
src/types/redis_stream.cc | 1 +
tests/gocase/unit/type/stream/stream_test.go | 24 ++++++++++++++++++++++++
2 files changed, 25 insertions(+)
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 476c5ba4..afc26a5f 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -1097,6 +1097,7 @@ rocksdb::Status Stream::RangeWithPending(const Slice
&stream_name, StreamRangeOp
if (!s.ok()) {
return s;
}
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key,
&get_group_value);
}
auto batch = storage_->GetWriteBatchBase();
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index 66ae0705..a48ce651 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -1216,6 +1216,30 @@ func TestStreamOffset(t *testing.T) {
Messages: []redis.XMessage{{ID: "5-0", Values:
map[string]interface{}(nil)}},
}}, r)
})
+
+ t.Run("Check xreadgroup fetches the newest data after create consumer
in the command", func(t *testing.T) {
+ streamName := "mystream"
+ groupName := "mygroup"
+ require.NoError(t, rdb.Del(ctx, streamName).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "1-0",
+ Values: []string{"field1", "data1"},
+ }).Err())
+ require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+ consumerName := "myconsumer"
+ err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName,
+ Streams: []string{streamName, ">"},
+ Count: 1,
+ NoAck: false,
+ }).Err()
+ require.NoError(t, err)
+ ri, erri := rdb.XInfoGroups(ctx, streamName).Result()
+ require.NoError(t, erri)
+ require.Equal(t, int64(1), ri[0].Consumers)
+ })
}
func parseStreamEntryID(id string) (ts int64, seqNum int64) {