This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 25216331b fix(stream): add missing return after empty response in
XREAD/XREADGROUP OnWrite (#3337)
25216331b is described below
commit 25216331bce6f9e97b945aad1cbb04e2db261287
Author: Nick Sweeting <[email protected]>
AuthorDate: Thu Jan 8 22:05:41 2026 -0500
fix(stream): add missing return after empty response in XREAD/XREADGROUP
OnWrite (#3337)
When the `OnWrite()` callback for blocking `XREAD`/`XREADGROUP` commands
receives empty results, it sends a null array response but then
unconditionally calls `SendReply()`, which sends an additional empty
array response.
This causes clients to receive two RESP responses (`*-1\r\n*0\r\n`) for
a single command, leading to protocol desynchronization.
### Root Cause
**CommandXReadGroup::OnWrite() - lines 1666-1670:**
```cpp
if (results.empty()) {
conn_->Reply(redis::MultiLen(-1)); // Sends *-1\r\n
// BUG: missing return here!
}
SendReply(results); // Always called, sends *0\r\n when results is empty
```
**CommandXRead::OnWrite() - lines 1354-1358:**
```cpp
if (results.empty()) {
conn_->Reply(conn_->NilArray()); // Sends *-1\r\n
// BUG: missing return here!
}
SendReply(results); // Always called, sends *0\r\n when results is empty
```
### Fix
Add `return;` statements after sending the empty response to prevent the
duplicate `SendReply()` call.
## Reproduction
The bug triggers under concurrent load when multiple clients are
blocking on `XREADGROUP BLOCK` while messages are being added via
`XADD`.
<details>
<summary>Reproduction script (Go)</summary>
```go
// Build: go build -o repro repro.go
// Run: ./repro localhost:6666
package main
import (
"bytes"
"fmt"
"net"
"os"
"strconv"
"sync"
"time"
)
var host = "localhost:6666"
func encodeCommand(args ...string) []byte {
var buf bytes.Buffer
buf.WriteString(fmt.Sprintf("*%d\r\n", len(args)))
for _, arg := range args {
buf.WriteString(fmt.Sprintf("$%d\r\n%s\r\n", len(arg), arg))
}
return buf.Bytes()
}
func sendCommand(conn net.Conn, args ...string) ([]byte, error) {
conn.Write(encodeCommand(args...))
buf := make([]byte, 4096)
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
n, _ := conn.Read(buf)
return buf[:n], nil
}
func main() {
if len(os.Args) > 1 {
host = os.Args[1]
}
stream, group := "test_stream", "test_group"
// Setup
conn, _ := net.Dial("tcp", host)
sendCommand(conn, "DEL", stream)
sendCommand(conn, "XGROUP", "CREATE", stream, group, "0",
"MKSTREAM")
conn.Close()
var errors []string
var mu sync.Mutex
var wg sync.WaitGroup
// 5 concurrent blocking readers
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
conn, _ := net.Dial("tcp", host)
defer conn.Close()
for j := 0; j < 100; j++ {
cmd := encodeCommand("XREADGROUP", "GROUP",
group, fmt.Sprintf("reader%d", id),
"COUNT", "1", "BLOCK", "50",
"STREAMS", stream, ">")
conn.Write(cmd)
buf := make([]byte, 4096)
conn.SetReadDeadline(time.Now().Add(2 *
time.Second))
n, _ := conn.Read(buf)
if bytes.Contains(buf[:n],
[]byte("*-1\r\n*0\r\n")) {
mu.Lock()
errors = append(errors,
fmt.Sprintf("Reader %d: got *-1\\r\\n*0\\r\\n", id))
mu.Unlock()
fmt.Printf("[BUG] Reader %d:
received double response *-1\\r\\n*0\\r\\n\n", id)
return
}
}
}(i)
}
// Writer adding messages concurrently
go func() {
conn, _ := net.Dial("tcp", host)
defer conn.Close()
for i := 0; i < 200; i++ {
sendCommand(conn, "XADD", stream, "*", "msg",
strconv.Itoa(i))
time.Sleep(5 * time.Millisecond)
}
}()
wg.Wait()
// Cleanup
conn, _ = net.Dial("tcp", host)
sendCommand(conn, "DEL", stream)
conn.Close()
if len(errors) > 0 {
fmt.Printf("\nBUG REPRODUCED: %d occurrence(s)\n",
len(errors))
os.Exit(1)
}
fmt.Println("No bug detected this run (try again - it's a race
condition)")
}
```
</details>
The go-redis client detects this and logs:
```
redis: pool.go:768: Conn has unread data (not push notification), removing
it
```
I have verified locally that this fix eliminates the bug completely.
## Test
Added a test case `XREADGROUP BLOCK concurrent readers should not
receive double responses` in
`tests/gocase/unit/type/stream/stream_test.go` that exercises concurrent
blocking readers with concurrent writers.
## Type of change
- [x] Bug fix (non-breaking change which fixes an issue)
Resolves #3338
---
src/commands/cmd_stream.cc | 2 +
tests/gocase/unit/type/stream/stream_test.go | 66 ++++++++++++++++++++++++++++
2 files changed, 68 insertions(+)
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index f4c04a21c..233d8475f 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -1353,6 +1353,7 @@ class CommandXRead : public Commander,
if (results.empty()) {
conn_->Reply(conn_->NilArray());
+ return;
}
SendReply(results);
@@ -1664,6 +1665,7 @@ class CommandXReadGroup : public Commander,
if (results.empty()) {
conn_->Reply(redis::MultiLen(-1));
+ return;
}
SendReply(results);
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index e97cde87e..4593d3c27 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -2361,6 +2361,72 @@ func TestStreamOffset(t *testing.T) {
require.Greater(t, pendingEntry.Idle, time.Millisecond)
require.Less(t, pendingEntry.Idle, 10*time.Second)
})
+
+ t.Run("XREADGROUP BLOCK concurrent readers should not receive double
responses", func(t *testing.T) {
+ streamName := "test-concurrent-xreadgroup-block"
+ groupName := "test-group"
+
+ require.NoError(t, rdb.Del(ctx, streamName).Err())
+ require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName,
groupName, "0").Err())
+
+ // Create multiple clients to simulate concurrent blocking
readers
+ numReaders := 5
+ numIterations := 20
+ clients := make([]*redis.Client, numReaders)
+ for i := 0; i < numReaders; i++ {
+ clients[i] = srv.NewClient()
+ defer clients[i].Close()
+ }
+
+ errorCh := make(chan error, numReaders*numIterations)
+ doneCh := make(chan bool, numReaders)
+
+ // Start concurrent blocking readers
+ for i := 0; i < numReaders; i++ {
+ go func(id int) {
+ defer func() { doneCh <- true }()
+ for j := 0; j < numIterations; j++ {
+ _, err := clients[id].XReadGroup(ctx,
&redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer:
fmt.Sprintf("reader%d", id),
+ Streams: []string{streamName,
">"},
+ Count: 1,
+ Block: 50 * time.Millisecond,
+ }).Result()
+ // redis.Nil is expected when block
times out with no data
+ if err != nil && err != redis.Nil {
+ errorCh <- err
+ }
+ }
+ }(i)
+ }
+
+ // Concurrently add messages while readers are blocking
+ go func() {
+ for i := 0; i < 100; i++ {
+ rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ Values: []string{"msg",
strconv.Itoa(i)},
+ })
+ time.Sleep(5 * time.Millisecond)
+ }
+ }()
+
+ // Wait for all readers to complete
+ for i := 0; i < numReaders; i++ {
+ <-doneCh
+ }
+ close(errorCh)
+
+ // Collect any errors - there should be none
+ var errors []error
+ for err := range errorCh {
+ errors = append(errors, err)
+ }
+ require.Empty(t, errors, "XREADGROUP BLOCK should not produce
protocol errors under concurrent load")
+
+ require.NoError(t, rdb.Del(ctx, streamName).Err())
+ })
}
func parseStreamEntryID(id string) (ts int64, seqNum int64) {