This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 89c5b241 chore(tests): add basic tests for the stream consumer group  
(#2533)
89c5b241 is described below

commit 89c5b2419bc66e35c5a2ff0c2d5d581033486ae4
Author: Jonathan Chen <[email protected]>
AuthorDate: Mon Sep 16 20:36:54 2024 -0400

    chore(tests): add basic tests for the stream consumer group  (#2533)
---
 tests/gocase/unit/type/stream/stream_test.go | 108 +++++++++++++++++++++++++++
 1 file changed, 108 insertions(+)

diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index 943127fc..b50b230f 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -1119,6 +1119,114 @@ func TestStreamOffset(t *testing.T) {
                require.Equal(t, msgID.ID, infoGroup.LastDeliveredID)
        })
 
+       t.Run("XINFO Test idle time and pending messages, for issue #2478", 
func(t *testing.T) {
+               streamName := "test-stream-2478"
+               groupName := "test-group-2478"
+               consumerName := "test-consumer-2478"
+
+               rdb.Del(ctx, streamName)
+               rdb.XGroupDestroy(ctx, streamName, groupName)
+
+               for i := 1; i <= 5; i++ {
+                       require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                               Stream: streamName,
+                               ID:     fmt.Sprintf("%d-0", i),
+                               Values: map[string]interface{}{"field": 
fmt.Sprintf("value%d", i)},
+                       }).Err())
+               }
+
+               require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, 
"0").Err())
+               r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       Streams:  []string{streamName, ">"},
+                       Count:    5,
+               }).Result()
+               require.NoError(t, err)
+               require.Len(t, r[0].Messages, 5)
+
+               time.Sleep(2 * time.Second)
+
+               consumers, err := rdb.XInfoConsumers(ctx, streamName, 
groupName).Result()
+               require.NoError(t, err)
+
+               var consumerInfo redis.XInfoConsumer
+               for _, c := range consumers {
+                       if c.Name == consumerName {
+                               consumerInfo = c
+                               break
+                       }
+               }
+
+               require.True(t, consumerInfo.Idle >= 2000)
+               require.Equal(t, int64(5), consumerInfo.Pending)
+
+               ackIDs := make([]string, 5)
+               for i := 1; i <= 5; i++ {
+                       ackIDs[i-1] = fmt.Sprintf("%d-0", i)
+               }
+               require.NoError(t, rdb.XAck(ctx, streamName, groupName, 
ackIDs...).Err())
+
+               consumers, err = rdb.XInfoConsumers(ctx, streamName, 
groupName).Result()
+               require.NoError(t, err)
+
+               for _, c := range consumers {
+                       if c.Name == consumerName {
+                               consumerInfo = c
+                               break
+                       }
+               }
+
+               require.Equal(t, int64(0), consumerInfo.Pending)
+       })
+
+       t.Run("XINFO Test consumer removal and inactive time, for issue #2478", 
func(t *testing.T) {
+               streamName := "stream-test-2478"
+               groupName := "group-test-2478"
+               consumerName := "consumer-test-2478"
+
+               rdb.Del(ctx, streamName)
+               rdb.XGroupDestroy(ctx, streamName, groupName)
+
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "*",
+                       Values: map[string]interface{}{"field": "value"},
+               }).Err())
+
+               require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, 
"0").Err())
+               _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       Streams:  []string{streamName, ">"},
+                       Count:    1,
+               }).Result()
+               require.NoError(t, err)
+
+               time.Sleep(500 * time.Millisecond)
+
+               consumers, err := rdb.XInfoConsumers(ctx, streamName, 
groupName).Result()
+               require.NoError(t, err)
+
+               var consumerInfo redis.XInfoConsumer
+               for _, c := range consumers {
+                       if c.Name == consumerName {
+                               consumerInfo = c
+                               break
+                       }
+               }
+
+               require.Equal(t, consumerName, consumerInfo.Name)
+               require.NoError(t, rdb.XGroupDelConsumer(ctx, streamName, 
groupName, consumerName).Err())
+
+               consumers, err = rdb.XInfoConsumers(ctx, streamName, 
groupName).Result()
+               require.NoError(t, err)
+
+               for _, c := range consumers {
+                       require.NotEqual(t, consumerName, c.Name)
+               }
+       })
+
        t.Run("XREAD After XGroupCreate and XGroupCreateConsumer, for issue 
#2109", func(t *testing.T) {
                streamName := "test-stream"
                group := "group"

Reply via email to