nsweeting opened a new pull request, #3337:
URL: https://github.com/apache/kvrocks/pull/3337
## Search before asking
- [x] I had searched in the issues and found no similar issues.
## What does this PR do?
When the `OnWrite()` callback for blocking `XREAD`/`XREADGROUP` commands
receives empty results, it sends a null array response but then unconditionally
calls `SendReply()`, which sends an additional empty array response.
This causes clients to receive two RESP responses (`*-1\r\n*0\r\n`) for a
single command, leading to protocol desynchronization.
### Root Cause
**CommandXReadGroup::OnWrite() - lines 1666-1670:**
```cpp
if (results.empty()) {
conn_->Reply(redis::MultiLen(-1)); // Sends *-1\r\n
// BUG: missing return here!
}
SendReply(results); // Always called, sends *0\r\n when results is empty
```
**CommandXRead::OnWrite() - lines 1354-1358:**
```cpp
if (results.empty()) {
conn_->Reply(conn_->NilArray()); // Sends *-1\r\n
// BUG: missing return here!
}
SendReply(results); // Always called, sends *0\r\n when results is empty
```
### Fix
Add `return;` statements after sending the empty response to prevent the
duplicate `SendReply()` call.
## Reproduction
The bug triggers under concurrent load when multiple clients are blocking on
`XREADGROUP BLOCK` while messages are being added via `XADD`.
<details>
<summary>Reproduction script (Go)</summary>
```go
// Build: go build -o repro repro.go
// Run: ./repro localhost:6666
package main
import (
"bytes"
"fmt"
"net"
"os"
"strconv"
"sync"
"time"
)
var host = "localhost:6666"
func encodeCommand(args ...string) []byte {
var buf bytes.Buffer
buf.WriteString(fmt.Sprintf("*%d\r\n", len(args)))
for _, arg := range args {
buf.WriteString(fmt.Sprintf("$%d\r\n%s\r\n", len(arg), arg))
}
return buf.Bytes()
}
func sendCommand(conn net.Conn, args ...string) ([]byte, error) {
conn.Write(encodeCommand(args...))
buf := make([]byte, 4096)
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
n, _ := conn.Read(buf)
return buf[:n], nil
}
func main() {
if len(os.Args) > 1 {
host = os.Args[1]
}
stream, group := "test_stream", "test_group"
// Setup
conn, _ := net.Dial("tcp", host)
sendCommand(conn, "DEL", stream)
sendCommand(conn, "XGROUP", "CREATE", stream, group, "0", "MKSTREAM")
conn.Close()
var errors []string
var mu sync.Mutex
var wg sync.WaitGroup
// 5 concurrent blocking readers
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
conn, _ := net.Dial("tcp", host)
defer conn.Close()
for j := 0; j < 100; j++ {
cmd := encodeCommand("XREADGROUP", "GROUP",
group, fmt.Sprintf("reader%d", id),
"COUNT", "1", "BLOCK", "50", "STREAMS",
stream, ">")
conn.Write(cmd)
buf := make([]byte, 4096)
conn.SetReadDeadline(time.Now().Add(2 *
time.Second))
n, _ := conn.Read(buf)
if bytes.Contains(buf[:n],
[]byte("*-1\r\n*0\r\n")) {
mu.Lock()
errors = append(errors,
fmt.Sprintf("Reader %d: got *-1\\r\\n*0\\r\\n", id))
mu.Unlock()
fmt.Printf("[BUG] Reader %d: received
double response *-1\\r\\n*0\\r\\n\n", id)
return
}
}
}(i)
}
// Writer adding messages concurrently
go func() {
conn, _ := net.Dial("tcp", host)
defer conn.Close()
for i := 0; i < 200; i++ {
sendCommand(conn, "XADD", stream, "*", "msg",
strconv.Itoa(i))
time.Sleep(5 * time.Millisecond)
}
}()
wg.Wait()
// Cleanup
conn, _ = net.Dial("tcp", host)
sendCommand(conn, "DEL", stream)
conn.Close()
if len(errors) > 0 {
fmt.Printf("\nBUG REPRODUCED: %d occurrence(s)\n", len(errors))
os.Exit(1)
}
fmt.Println("No bug detected this run (try again - it's a race
condition)")
}
```
</details>
The go-redis client detects this and logs:
```
redis: pool.go:768: Conn has unread data (not push notification), removing it
```
I have verified locally that this fix eliminates the bug completely.
## Test
Added a test case `XREADGROUP BLOCK concurrent readers should not receive
double responses` in `tests/gocase/unit/type/stream/stream_test.go` that
exercises concurrent blocking readers with concurrent writers.
## Type of change
- [x] Bug fix (non-breaking change which fixes an issue)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]