nsweeting opened a new issue, #3338: URL: https://github.com/apache/kvrocks/issues/3338
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/kvrocks/issues) and found no similar issues. ### Version ``` kvrocks_version: unstable kvrocks_git_sha1: 71b83765 OS: macOS Darwin 24.6.0 (also tested via Docker) Docker image: apache/kvrocks:nightly ``` ### Minimal reproduce step When multiple clients issue `XREADGROUP BLOCK` on the same stream while a writer is adding messages, Kvrocks sometimes sends two RESP responses for a single command. **Reproduction using raw sockets (Go):** ```go // Build: go build -o repro repro.go // Run: ./repro localhost:6666 package main import ( "bytes" "fmt" "net" "os" "strconv" "sync" "time" ) var host = "localhost:6666" func encodeCommand(args ...string) []byte { var buf bytes.Buffer buf.WriteString(fmt.Sprintf("*%d\r\n", len(args))) for _, arg := range args { buf.WriteString(fmt.Sprintf("$%d\r\n%s\r\n", len(arg), arg)) } return buf.Bytes() } func sendCommand(conn net.Conn, args ...string) ([]byte, error) { conn.Write(encodeCommand(args...)) buf := make([]byte, 4096) conn.SetReadDeadline(time.Now().Add(5 * time.Second)) n, _ := conn.Read(buf) return buf[:n], nil } func main() { if len(os.Args) > 1 { host = os.Args[1] } stream, group := "test_stream", "test_group" // Setup conn, _ := net.Dial("tcp", host) sendCommand(conn, "DEL", stream) sendCommand(conn, "XGROUP", "CREATE", stream, group, "0", "MKSTREAM") conn.Close() var errors []string var mu sync.Mutex var wg sync.WaitGroup // 5 concurrent blocking readers for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() conn, _ := net.Dial("tcp", host) defer conn.Close() for j := 0; j < 100; j++ { cmd := encodeCommand("XREADGROUP", "GROUP", group, fmt.Sprintf("reader%d", id), "COUNT", "1", "BLOCK", "50", "STREAMS", stream, ">") conn.Write(cmd) buf := make([]byte, 4096) conn.SetReadDeadline(time.Now().Add(2 * time.Second)) n, _ := conn.Read(buf) if bytes.Contains(buf[:n], []byte("*-1\r\n*0\r\n")) { mu.Lock() errors = append(errors, fmt.Sprintf("Reader %d: got *-1\\r\\n*0\\r\\n", id)) mu.Unlock() fmt.Printf("[BUG] Reader %d: received double response *-1\\r\\n*0\\r\\n\n", id) return } } }(i) } // Writer adding messages concurrently go func() { conn, _ := net.Dial("tcp", host) defer conn.Close() for i := 0; i < 200; i++ { sendCommand(conn, "XADD", stream, "*", "msg", strconv.Itoa(i)) time.Sleep(5 * time.Millisecond) } }() wg.Wait() // Cleanup conn, _ = net.Dial("tcp", host) sendCommand(conn, "DEL", stream) conn.Close() if len(errors) > 0 { fmt.Printf("\nBUG REPRODUCED: %d occurrence(s)\n", len(errors)) os.Exit(1) } fmt.Println("No bug detected this run (try again - it's a race condition)") } ``` **Steps:** 1. Start Kvrocks 2. Run the reproduction script 3. Bug triggers almost immediately under concurrent load ### What did you expect to see? Each `XREADGROUP BLOCK` command should receive exactly **one** RESP response: - `$-1\r\n` (null bulk string) when the block times out with no data, OR - `*N\r\n...` (array) containing the stream data ### What did you see instead? Under concurrent load, Kvrocks sends **two responses** for a single command: ``` *-1\r\n*0\r\n ``` This is: - `*-1\r\n` - null array - `*0\r\n` - empty array This causes protocol desynchronization. The go-redis client library detects this and logs: ``` redis: pool.go:768: Conn has unread data (not push notification), removing it ``` Example output from reproduction script: ``` [BUG] Reader 3: received double response *-1\r\n*0\r\n [BUG] Reader 2: received double response *-1\r\n*0\r\n [BUG] Reader 4: received double response *-1\r\n*0\r\n [BUG] Reader 0: received double response *-1\r\n*0\r\n BUG REPRODUCED: 4 occurrence(s) ``` ### Anything Else? The bug appears to be a race condition in the blocking command handling when: 1. Multiple clients are blocked waiting on `XREADGROUP BLOCK` 2. A message is added via `XADD` 3. Kvrocks appears to send both a "no data" response and a separate response, rather than a single atomic response **Impact:** Redis client libraries that don't have defensive handling for unexpected extra responses will crash or corrupt their connection state. ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
