This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 89c5b241 chore(tests): add basic tests for the stream consumer group
(#2533)
89c5b241 is described below
commit 89c5b2419bc66e35c5a2ff0c2d5d581033486ae4
Author: Jonathan Chen <[email protected]>
AuthorDate: Mon Sep 16 20:36:54 2024 -0400
chore(tests): add basic tests for the stream consumer group (#2533)
---
tests/gocase/unit/type/stream/stream_test.go | 108 +++++++++++++++++++++++++++
1 file changed, 108 insertions(+)
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index 943127fc..b50b230f 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -1119,6 +1119,114 @@ func TestStreamOffset(t *testing.T) {
require.Equal(t, msgID.ID, infoGroup.LastDeliveredID)
})
+ t.Run("XINFO Test idle time and pending messages, for issue #2478",
func(t *testing.T) {
+ streamName := "test-stream-2478"
+ groupName := "test-group-2478"
+ consumerName := "test-consumer-2478"
+
+ rdb.Del(ctx, streamName)
+ rdb.XGroupDestroy(ctx, streamName, groupName)
+
+ for i := 1; i <= 5; i++ {
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: fmt.Sprintf("%d-0", i),
+ Values: map[string]interface{}{"field":
fmt.Sprintf("value%d", i)},
+ }).Err())
+ }
+
+ require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+ r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName,
+ Streams: []string{streamName, ">"},
+ Count: 5,
+ }).Result()
+ require.NoError(t, err)
+ require.Len(t, r[0].Messages, 5)
+
+ time.Sleep(2 * time.Second)
+
+ consumers, err := rdb.XInfoConsumers(ctx, streamName,
groupName).Result()
+ require.NoError(t, err)
+
+ var consumerInfo redis.XInfoConsumer
+ for _, c := range consumers {
+ if c.Name == consumerName {
+ consumerInfo = c
+ break
+ }
+ }
+
+ require.True(t, consumerInfo.Idle >= 2000)
+ require.Equal(t, int64(5), consumerInfo.Pending)
+
+ ackIDs := make([]string, 5)
+ for i := 1; i <= 5; i++ {
+ ackIDs[i-1] = fmt.Sprintf("%d-0", i)
+ }
+ require.NoError(t, rdb.XAck(ctx, streamName, groupName,
ackIDs...).Err())
+
+ consumers, err = rdb.XInfoConsumers(ctx, streamName,
groupName).Result()
+ require.NoError(t, err)
+
+ for _, c := range consumers {
+ if c.Name == consumerName {
+ consumerInfo = c
+ break
+ }
+ }
+
+ require.Equal(t, int64(0), consumerInfo.Pending)
+ })
+
+ t.Run("XINFO Test consumer removal and inactive time, for issue #2478",
func(t *testing.T) {
+ streamName := "stream-test-2478"
+ groupName := "group-test-2478"
+ consumerName := "consumer-test-2478"
+
+ rdb.Del(ctx, streamName)
+ rdb.XGroupDestroy(ctx, streamName, groupName)
+
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "*",
+ Values: map[string]interface{}{"field": "value"},
+ }).Err())
+
+ require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+ _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName,
+ Streams: []string{streamName, ">"},
+ Count: 1,
+ }).Result()
+ require.NoError(t, err)
+
+ time.Sleep(500 * time.Millisecond)
+
+ consumers, err := rdb.XInfoConsumers(ctx, streamName,
groupName).Result()
+ require.NoError(t, err)
+
+ var consumerInfo redis.XInfoConsumer
+ for _, c := range consumers {
+ if c.Name == consumerName {
+ consumerInfo = c
+ break
+ }
+ }
+
+ require.Equal(t, consumerName, consumerInfo.Name)
+ require.NoError(t, rdb.XGroupDelConsumer(ctx, streamName,
groupName, consumerName).Err())
+
+ consumers, err = rdb.XInfoConsumers(ctx, streamName,
groupName).Result()
+ require.NoError(t, err)
+
+ for _, c := range consumers {
+ require.NotEqual(t, consumerName, c.Name)
+ }
+ })
+
t.Run("XREAD After XGroupCreate and XGroupCreateConsumer, for issue
#2109", func(t *testing.T) {
streamName := "test-stream"
group := "group"