This is an automated email from the ASF dual-hosted git repository.
torwig pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 149d4fbd fix(stream): change XREADGROUP response for empty read
response (#2569)
149d4fbd is described below
commit 149d4fbdffd38ae34754313c4c8fdf5dd3f1ab9a
Author: Jonathan Chen <[email protected]>
AuthorDate: Thu Oct 3 09:53:00 2024 -0400
fix(stream): change XREADGROUP response for empty read response (#2569)
Co-authored-by: Twice <[email protected]>
---
src/commands/cmd_stream.cc | 14 +++++++++++--
tests/gocase/unit/type/stream/stream_test.go | 30 ++++++++++++++++++++++++++++
2 files changed, 42 insertions(+), 2 deletions(-)
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index f02d5b34..c9616db5 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -1537,7 +1537,12 @@ class CommandXReadGroup : public Commander,
if (block_ && results.empty()) {
if (conn->IsInExec()) {
- *output = redis::MultiLen(-1);
+ output->append(redis::MultiLen(streams_.size()));
+ for (const auto &stream_name : streams_) {
+ output->append(redis::MultiLen(2));
+ output->append(redis::BulkString(stream_name));
+ output->append(redis::MultiLen(0));
+ }
return Status::OK(); // No blocking in multi-exec
}
@@ -1545,7 +1550,12 @@ class CommandXReadGroup : public Commander,
}
if (!block_ && results.empty()) {
- *output = redis::MultiLen(-1);
+ output->append(redis::MultiLen(streams_.size()));
+ for (const auto &stream_name : streams_) {
+ output->append(redis::MultiLen(2));
+ output->append(redis::BulkString(stream_name));
+ output->append(redis::MultiLen(0));
+ }
return Status::OK();
}
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index b50b230f..d852bbcf 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -1016,6 +1016,36 @@ func TestStreamOffset(t *testing.T) {
require.Equal(t, int64(0), ri[0].Pending)
})
+ t.Run("XREADGROUP with empty streams returns empty arrays", func(t
*testing.T) {
+ streamName := "test-stream-empty1"
+ streamName2 := "test-stream-empty2"
+ groupName := "test-group-empty"
+ consumerName := "test-consumer-empty"
+
+ require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName,
groupName, "0").Err())
+ require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName2,
groupName, "0").Err())
+
+ res, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName,
+ Streams: []string{streamName, streamName2, "0", "0"},
+ }).Result()
+ require.NoError(t, err)
+
+ expectedRes := []redis.XStream{
+ {
+ Stream: streamName,
+ Messages: []redis.XMessage{},
+ },
+ {
+ Stream: streamName2,
+ Messages: []redis.XMessage{},
+ },
+ }
+
+ require.Equal(t, expectedRes, res)
+ })
+
t.Run("XGROUP SETID with different kinds of commands", func(t
*testing.T) {
streamName := "test-stream"
groupName := "test-group"