This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch 2.10
in repository https://gitbox.apache.org/repos/asf/kvrocks.git

commit 8b2b89f06fd03627dd3e004438cdeed995e6d5c1
Author: Jonathan Chen <[email protected]>
AuthorDate: Thu Oct 3 09:53:00 2024 -0400

    fix(stream): change XREADGROUP response for empty read response (#2569)
    
    Co-authored-by: Twice <[email protected]>
---
 src/commands/cmd_stream.cc                   | 14 +++++++++++--
 tests/gocase/unit/type/stream/stream_test.go | 30 ++++++++++++++++++++++++++++
 2 files changed, 42 insertions(+), 2 deletions(-)

diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index f02d5b34..c9616db5 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -1537,7 +1537,12 @@ class CommandXReadGroup : public Commander,
 
     if (block_ && results.empty()) {
       if (conn->IsInExec()) {
-        *output = redis::MultiLen(-1);
+        output->append(redis::MultiLen(streams_.size()));
+        for (const auto &stream_name : streams_) {
+          output->append(redis::MultiLen(2));
+          output->append(redis::BulkString(stream_name));
+          output->append(redis::MultiLen(0));
+        }
         return Status::OK();  // No blocking in multi-exec
       }
 
@@ -1545,7 +1550,12 @@ class CommandXReadGroup : public Commander,
     }
 
     if (!block_ && results.empty()) {
-      *output = redis::MultiLen(-1);
+      output->append(redis::MultiLen(streams_.size()));
+      for (const auto &stream_name : streams_) {
+        output->append(redis::MultiLen(2));
+        output->append(redis::BulkString(stream_name));
+        output->append(redis::MultiLen(0));
+      }
       return Status::OK();
     }
 
diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index b50b230f..d852bbcf 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -1016,6 +1016,36 @@ func TestStreamOffset(t *testing.T) {
                require.Equal(t, int64(0), ri[0].Pending)
        })
 
+       t.Run("XREADGROUP with empty streams returns empty arrays", func(t 
*testing.T) {
+               streamName := "test-stream-empty1"
+               streamName2 := "test-stream-empty2"
+               groupName := "test-group-empty"
+               consumerName := "test-consumer-empty"
+
+               require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, 
groupName, "0").Err())
+               require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName2, 
groupName, "0").Err())
+
+               res, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       Streams:  []string{streamName, streamName2, "0", "0"},
+               }).Result()
+               require.NoError(t, err)
+
+               expectedRes := []redis.XStream{
+                       {
+                               Stream:   streamName,
+                               Messages: []redis.XMessage{},
+                       },
+                       {
+                               Stream:   streamName2,
+                               Messages: []redis.XMessage{},
+                       },
+               }
+
+               require.Equal(t, expectedRes, res)
+       })
+
        t.Run("XGROUP SETID with different kinds of commands", func(t 
*testing.T) {
                streamName := "test-stream"
                groupName := "test-group"

Reply via email to