nsweeting opened a new pull request, #3337:
URL: https://github.com/apache/kvrocks/pull/3337

   ## Search before asking
   
   - [x] I had searched in the issues and found no similar issues.
   
   ## What does this PR do?
   
   When the `OnWrite()` callback for blocking `XREAD`/`XREADGROUP` commands 
receives empty results, it sends a null array response but then unconditionally 
calls `SendReply()`, which sends an additional empty array response.
   
   This causes clients to receive two RESP responses (`*-1\r\n*0\r\n`) for a 
single command, leading to protocol desynchronization.
   
   ### Root Cause
   
   **CommandXReadGroup::OnWrite() - lines 1666-1670:**
   ```cpp
   if (results.empty()) {
     conn_->Reply(redis::MultiLen(-1));  // Sends *-1\r\n
     // BUG: missing return here!
   }
   
   SendReply(results);  // Always called, sends *0\r\n when results is empty
   ```
   
   **CommandXRead::OnWrite() - lines 1354-1358:**
   ```cpp
   if (results.empty()) {
     conn_->Reply(conn_->NilArray());  // Sends *-1\r\n
     // BUG: missing return here!
   }
   
   SendReply(results);  // Always called, sends *0\r\n when results is empty
   ```
   
   ### Fix
   
   Add `return;` statements after sending the empty response to prevent the 
duplicate `SendReply()` call.
   
   ## Reproduction
   
   The bug triggers under concurrent load when multiple clients are blocking on 
`XREADGROUP BLOCK` while messages are being added via `XADD`.
   
   <details>
   <summary>Reproduction script (Go)</summary>
   
   ```go
   // Build: go build -o repro repro.go
   // Run:   ./repro localhost:6666
   
   package main
   
   import (
        "bytes"
        "fmt"
        "net"
        "os"
        "strconv"
        "sync"
        "time"
   )
   
   var host = "localhost:6666"
   
   func encodeCommand(args ...string) []byte {
        var buf bytes.Buffer
        buf.WriteString(fmt.Sprintf("*%d\r\n", len(args)))
        for _, arg := range args {
                buf.WriteString(fmt.Sprintf("$%d\r\n%s\r\n", len(arg), arg))
        }
        return buf.Bytes()
   }
   
   func sendCommand(conn net.Conn, args ...string) ([]byte, error) {
        conn.Write(encodeCommand(args...))
        buf := make([]byte, 4096)
        conn.SetReadDeadline(time.Now().Add(5 * time.Second))
        n, _ := conn.Read(buf)
        return buf[:n], nil
   }
   
   func main() {
        if len(os.Args) > 1 {
                host = os.Args[1]
        }
   
        stream, group := "test_stream", "test_group"
   
        // Setup
        conn, _ := net.Dial("tcp", host)
        sendCommand(conn, "DEL", stream)
        sendCommand(conn, "XGROUP", "CREATE", stream, group, "0", "MKSTREAM")
        conn.Close()
   
        var errors []string
        var mu sync.Mutex
        var wg sync.WaitGroup
   
        // 5 concurrent blocking readers
        for i := 0; i < 5; i++ {
                wg.Add(1)
                go func(id int) {
                        defer wg.Done()
                        conn, _ := net.Dial("tcp", host)
                        defer conn.Close()
   
                        for j := 0; j < 100; j++ {
                                cmd := encodeCommand("XREADGROUP", "GROUP", 
group, fmt.Sprintf("reader%d", id),
                                        "COUNT", "1", "BLOCK", "50", "STREAMS", 
stream, ">")
                                conn.Write(cmd)
   
                                buf := make([]byte, 4096)
                                conn.SetReadDeadline(time.Now().Add(2 * 
time.Second))
                                n, _ := conn.Read(buf)
   
                                if bytes.Contains(buf[:n], 
[]byte("*-1\r\n*0\r\n")) {
                                        mu.Lock()
                                        errors = append(errors, 
fmt.Sprintf("Reader %d: got *-1\\r\\n*0\\r\\n", id))
                                        mu.Unlock()
                                        fmt.Printf("[BUG] Reader %d: received 
double response *-1\\r\\n*0\\r\\n\n", id)
                                        return
                                }
                        }
                }(i)
        }
   
        // Writer adding messages concurrently
        go func() {
                conn, _ := net.Dial("tcp", host)
                defer conn.Close()
                for i := 0; i < 200; i++ {
                        sendCommand(conn, "XADD", stream, "*", "msg", 
strconv.Itoa(i))
                        time.Sleep(5 * time.Millisecond)
                }
        }()
   
        wg.Wait()
   
        // Cleanup
        conn, _ = net.Dial("tcp", host)
        sendCommand(conn, "DEL", stream)
        conn.Close()
   
        if len(errors) > 0 {
                fmt.Printf("\nBUG REPRODUCED: %d occurrence(s)\n", len(errors))
                os.Exit(1)
        }
        fmt.Println("No bug detected this run (try again - it's a race 
condition)")
   }
   ```
   
   </details>
   
   The go-redis client detects this and logs:
   ```
   redis: pool.go:768: Conn has unread data (not push notification), removing it
   ```
   
   I have verified locally that this fix eliminates the bug completely.
   
   ## Test
   
   Added a test case `XREADGROUP BLOCK concurrent readers should not receive 
double responses` in `tests/gocase/unit/type/stream/stream_test.go` that 
exercises concurrent blocking readers with concurrent writers.
   
   ## Type of change
   
   - [x] Bug fix (non-breaking change which fixes an issue)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to