nsweeting opened a new issue, #3338:
URL: https://github.com/apache/kvrocks/issues/3338

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/kvrocks/issues) and found no similar issues.
   
   
   ### Version
   
   ```
   kvrocks_version: unstable
   kvrocks_git_sha1: 71b83765
   OS: macOS Darwin 24.6.0 (also tested via Docker)
   Docker image: apache/kvrocks:nightly
   ```
   
   ### Minimal reproduce step
   
   When multiple clients issue `XREADGROUP BLOCK` on the same stream while a 
writer is adding messages, Kvrocks sometimes sends two RESP responses for a 
single command.
   
   **Reproduction using raw sockets (Go):**
   
   ```go
   // Build: go build -o repro repro.go
   // Run:   ./repro localhost:6666
   
   package main
   
   import (
        "bytes"
        "fmt"
        "net"
        "os"
        "strconv"
        "sync"
        "time"
   )
   
   var host = "localhost:6666"
   
   func encodeCommand(args ...string) []byte {
        var buf bytes.Buffer
        buf.WriteString(fmt.Sprintf("*%d\r\n", len(args)))
        for _, arg := range args {
                buf.WriteString(fmt.Sprintf("$%d\r\n%s\r\n", len(arg), arg))
        }
        return buf.Bytes()
   }
   
   func sendCommand(conn net.Conn, args ...string) ([]byte, error) {
        conn.Write(encodeCommand(args...))
        buf := make([]byte, 4096)
        conn.SetReadDeadline(time.Now().Add(5 * time.Second))
        n, _ := conn.Read(buf)
        return buf[:n], nil
   }
   
   func main() {
        if len(os.Args) > 1 {
                host = os.Args[1]
        }
   
        stream, group := "test_stream", "test_group"
   
        // Setup
        conn, _ := net.Dial("tcp", host)
        sendCommand(conn, "DEL", stream)
        sendCommand(conn, "XGROUP", "CREATE", stream, group, "0", "MKSTREAM")
        conn.Close()
   
        var errors []string
        var mu sync.Mutex
        var wg sync.WaitGroup
   
        // 5 concurrent blocking readers
        for i := 0; i < 5; i++ {
                wg.Add(1)
                go func(id int) {
                        defer wg.Done()
                        conn, _ := net.Dial("tcp", host)
                        defer conn.Close()
   
                        for j := 0; j < 100; j++ {
                                cmd := encodeCommand("XREADGROUP", "GROUP", 
group, fmt.Sprintf("reader%d", id),
                                        "COUNT", "1", "BLOCK", "50", "STREAMS", 
stream, ">")
                                conn.Write(cmd)
   
                                buf := make([]byte, 4096)
                                conn.SetReadDeadline(time.Now().Add(2 * 
time.Second))
                                n, _ := conn.Read(buf)
   
                                if bytes.Contains(buf[:n], 
[]byte("*-1\r\n*0\r\n")) {
                                        mu.Lock()
                                        errors = append(errors, 
fmt.Sprintf("Reader %d: got *-1\\r\\n*0\\r\\n", id))
                                        mu.Unlock()
                                        fmt.Printf("[BUG] Reader %d: received 
double response *-1\\r\\n*0\\r\\n\n", id)
                                        return
                                }
                        }
                }(i)
        }
   
        // Writer adding messages concurrently
        go func() {
                conn, _ := net.Dial("tcp", host)
                defer conn.Close()
                for i := 0; i < 200; i++ {
                        sendCommand(conn, "XADD", stream, "*", "msg", 
strconv.Itoa(i))
                        time.Sleep(5 * time.Millisecond)
                }
        }()
   
        wg.Wait()
   
        // Cleanup
        conn, _ = net.Dial("tcp", host)
        sendCommand(conn, "DEL", stream)
        conn.Close()
   
        if len(errors) > 0 {
                fmt.Printf("\nBUG REPRODUCED: %d occurrence(s)\n", len(errors))
                os.Exit(1)
        }
        fmt.Println("No bug detected this run (try again - it's a race 
condition)")
   }
   ```
   
   **Steps:**
   1. Start Kvrocks
   2. Run the reproduction script
   3. Bug triggers almost immediately under concurrent load
   
   ### What did you expect to see?
   
   Each `XREADGROUP BLOCK` command should receive exactly **one** RESP response:
   - `$-1\r\n` (null bulk string) when the block times out with no data, OR
   - `*N\r\n...` (array) containing the stream data
   
   ### What did you see instead?
   
   Under concurrent load, Kvrocks sends **two responses** for a single command:
   ```
   *-1\r\n*0\r\n
   ```
   
   This is:
   - `*-1\r\n` - null array
   - `*0\r\n` - empty array
   
   This causes protocol desynchronization. The go-redis client library detects 
this and logs:
   ```
   redis: pool.go:768: Conn has unread data (not push notification), removing it
   ```
   
   Example output from reproduction script:
   ```
   [BUG] Reader 3: received double response *-1\r\n*0\r\n
   [BUG] Reader 2: received double response *-1\r\n*0\r\n
   [BUG] Reader 4: received double response *-1\r\n*0\r\n
   [BUG] Reader 0: received double response *-1\r\n*0\r\n
   
   BUG REPRODUCED: 4 occurrence(s)
   ```
   
   ### Anything Else?
   
   The bug appears to be a race condition in the blocking command handling when:
   1. Multiple clients are blocked waiting on `XREADGROUP BLOCK`
   2. A message is added via `XADD`
   3. Kvrocks appears to send both a "no data" response and a separate 
response, rather than a single atomic response
   
   **Impact:** Redis client libraries that don't have defensive handling for 
unexpected extra responses will crash or corrupt their connection state.
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to