The GitHub Actions job "SonarCloud Analysis" on kvrocks.git/unstable has succeeded. Run started by GitHub user PragmaTwice (triggered by PragmaTwice).
Head commit for run: 25216331bce6f9e97b945aad1cbb04e2db261287 / Nick Sweeting <[email protected]> fix(stream): add missing return after empty response in XREAD/XREADGROUP OnWrite (#3337) When the `OnWrite()` callback for blocking `XREAD`/`XREADGROUP` commands receives empty results, it sends a null array response but then unconditionally calls `SendReply()`, which sends an additional empty array response. This causes clients to receive two RESP responses (`*-1\r\n*0\r\n`) for a single command, leading to protocol desynchronization. ### Root Cause **CommandXReadGroup::OnWrite() - lines 1666-1670:** ```cpp if (results.empty()) { conn_->Reply(redis::MultiLen(-1)); // Sends *-1\r\n // BUG: missing return here! } SendReply(results); // Always called, sends *0\r\n when results is empty ``` **CommandXRead::OnWrite() - lines 1354-1358:** ```cpp if (results.empty()) { conn_->Reply(conn_->NilArray()); // Sends *-1\r\n // BUG: missing return here! } SendReply(results); // Always called, sends *0\r\n when results is empty ``` ### Fix Add `return;` statements after sending the empty response to prevent the duplicate `SendReply()` call. ## Reproduction The bug triggers under concurrent load when multiple clients are blocking on `XREADGROUP BLOCK` while messages are being added via `XADD`. <details> <summary>Reproduction script (Go)</summary> ```go // Build: go build -o repro repro.go // Run: ./repro localhost:6666 package main import ( "bytes" "fmt" "net" "os" "strconv" "sync" "time" ) var host = "localhost:6666" func encodeCommand(args ...string) []byte { var buf bytes.Buffer buf.WriteString(fmt.Sprintf("*%d\r\n", len(args))) for _, arg := range args { buf.WriteString(fmt.Sprintf("$%d\r\n%s\r\n", len(arg), arg)) } return buf.Bytes() } func sendCommand(conn net.Conn, args ...string) ([]byte, error) { conn.Write(encodeCommand(args...)) buf := make([]byte, 4096) conn.SetReadDeadline(time.Now().Add(5 * time.Second)) n, _ := conn.Read(buf) return buf[:n], nil } func main() { if len(os.Args) > 1 { host = os.Args[1] } stream, group := "test_stream", "test_group" // Setup conn, _ := net.Dial("tcp", host) sendCommand(conn, "DEL", stream) sendCommand(conn, "XGROUP", "CREATE", stream, group, "0", "MKSTREAM") conn.Close() var errors []string var mu sync.Mutex var wg sync.WaitGroup // 5 concurrent blocking readers for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() conn, _ := net.Dial("tcp", host) defer conn.Close() for j := 0; j < 100; j++ { cmd := encodeCommand("XREADGROUP", "GROUP", group, fmt.Sprintf("reader%d", id), "COUNT", "1", "BLOCK", "50", "STREAMS", stream, ">") conn.Write(cmd) buf := make([]byte, 4096) conn.SetReadDeadline(time.Now().Add(2 * time.Second)) n, _ := conn.Read(buf) if bytes.Contains(buf[:n], []byte("*-1\r\n*0\r\n")) { mu.Lock() errors = append(errors, fmt.Sprintf("Reader %d: got *-1\\r\\n*0\\r\\n", id)) mu.Unlock() fmt.Printf("[BUG] Reader %d: received double response *-1\\r\\n*0\\r\\n\n", id) return } } }(i) } // Writer adding messages concurrently go func() { conn, _ := net.Dial("tcp", host) defer conn.Close() for i := 0; i < 200; i++ { sendCommand(conn, "XADD", stream, "*", "msg", strconv.Itoa(i)) time.Sleep(5 * time.Millisecond) } }() wg.Wait() // Cleanup conn, _ = net.Dial("tcp", host) sendCommand(conn, "DEL", stream) conn.Close() if len(errors) > 0 { fmt.Printf("\nBUG REPRODUCED: %d occurrence(s)\n", len(errors)) os.Exit(1) } fmt.Println("No bug detected this run (try again - it's a race condition)") } ``` </details> The go-redis client detects this and logs: ``` redis: pool.go:768: Conn has unread data (not push notification), removing it ``` I have verified locally that this fix eliminates the bug completely. ## Test Added a test case `XREADGROUP BLOCK concurrent readers should not receive double responses` in `tests/gocase/unit/type/stream/stream_test.go` that exercises concurrent blocking readers with concurrent writers. ## Type of change - [x] Bug fix (non-breaking change which fixes an issue) Resolves #3338 Report URL: https://github.com/apache/kvrocks/actions/runs/20846656216 With regards, GitHub Actions via GitBox
