This is an automated email from the ASF dual-hosted git repository. hulk pushed a commit to branch 2.10 in repository https://gitbox.apache.org/repos/asf/kvrocks.git
commit 8b2b89f06fd03627dd3e004438cdeed995e6d5c1 Author: Jonathan Chen <[email protected]> AuthorDate: Thu Oct 3 09:53:00 2024 -0400 fix(stream): change XREADGROUP response for empty read response (#2569) Co-authored-by: Twice <[email protected]> --- src/commands/cmd_stream.cc | 14 +++++++++++-- tests/gocase/unit/type/stream/stream_test.go | 30 ++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc index f02d5b34..c9616db5 100644 --- a/src/commands/cmd_stream.cc +++ b/src/commands/cmd_stream.cc @@ -1537,7 +1537,12 @@ class CommandXReadGroup : public Commander, if (block_ && results.empty()) { if (conn->IsInExec()) { - *output = redis::MultiLen(-1); + output->append(redis::MultiLen(streams_.size())); + for (const auto &stream_name : streams_) { + output->append(redis::MultiLen(2)); + output->append(redis::BulkString(stream_name)); + output->append(redis::MultiLen(0)); + } return Status::OK(); // No blocking in multi-exec } @@ -1545,7 +1550,12 @@ class CommandXReadGroup : public Commander, } if (!block_ && results.empty()) { - *output = redis::MultiLen(-1); + output->append(redis::MultiLen(streams_.size())); + for (const auto &stream_name : streams_) { + output->append(redis::MultiLen(2)); + output->append(redis::BulkString(stream_name)); + output->append(redis::MultiLen(0)); + } return Status::OK(); } diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index b50b230f..d852bbcf 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -1016,6 +1016,36 @@ func TestStreamOffset(t *testing.T) { require.Equal(t, int64(0), ri[0].Pending) }) + t.Run("XREADGROUP with empty streams returns empty arrays", func(t *testing.T) { + streamName := "test-stream-empty1" + streamName2 := "test-stream-empty2" + groupName := "test-group-empty" + consumerName := "test-consumer-empty" + + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName2, groupName, "0").Err()) + + res, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: consumerName, + Streams: []string{streamName, streamName2, "0", "0"}, + }).Result() + require.NoError(t, err) + + expectedRes := []redis.XStream{ + { + Stream: streamName, + Messages: []redis.XMessage{}, + }, + { + Stream: streamName2, + Messages: []redis.XMessage{}, + }, + } + + require.Equal(t, expectedRes, res) + }) + t.Run("XGROUP SETID with different kinds of commands", func(t *testing.T) { streamName := "test-stream" groupName := "test-group"
