The GitHub Actions job "SonarCloud Analysis" on kvrocks.git/unstable has 
succeeded.
Run started by GitHub user PragmaTwice (triggered by PragmaTwice).

Head commit for run:
25216331bce6f9e97b945aad1cbb04e2db261287 / Nick Sweeting <[email protected]>
fix(stream): add missing return after empty response in XREAD/XREADGROUP 
OnWrite (#3337)

When the `OnWrite()` callback for blocking `XREAD`/`XREADGROUP` commands
receives empty results, it sends a null array response but then
unconditionally calls `SendReply()`, which sends an additional empty
array response.

This causes clients to receive two RESP responses (`*-1\r\n*0\r\n`) for
a single command, leading to protocol desynchronization.

### Root Cause

**CommandXReadGroup::OnWrite() - lines 1666-1670:**
```cpp
if (results.empty()) {
  conn_->Reply(redis::MultiLen(-1));  // Sends *-1\r\n
  // BUG: missing return here!
}

SendReply(results);  // Always called, sends *0\r\n when results is empty
```

**CommandXRead::OnWrite() - lines 1354-1358:**
```cpp
if (results.empty()) {
  conn_->Reply(conn_->NilArray());  // Sends *-1\r\n
  // BUG: missing return here!
}

SendReply(results);  // Always called, sends *0\r\n when results is empty
```

### Fix

Add `return;` statements after sending the empty response to prevent the
duplicate `SendReply()` call.

## Reproduction

The bug triggers under concurrent load when multiple clients are
blocking on `XREADGROUP BLOCK` while messages are being added via
`XADD`.

<details>
<summary>Reproduction script (Go)</summary>

```go
// Build: go build -o repro repro.go
// Run:   ./repro localhost:6666

package main

import (
        "bytes"
        "fmt"
        "net"
        "os"
        "strconv"
        "sync"
        "time"
)

var host = "localhost:6666"

func encodeCommand(args ...string) []byte {
        var buf bytes.Buffer
        buf.WriteString(fmt.Sprintf("*%d\r\n", len(args)))
        for _, arg := range args {
                buf.WriteString(fmt.Sprintf("$%d\r\n%s\r\n", len(arg), arg))
        }
        return buf.Bytes()
}

func sendCommand(conn net.Conn, args ...string) ([]byte, error) {
        conn.Write(encodeCommand(args...))
        buf := make([]byte, 4096)
        conn.SetReadDeadline(time.Now().Add(5 * time.Second))
        n, _ := conn.Read(buf)
        return buf[:n], nil
}

func main() {
        if len(os.Args) > 1 {
                host = os.Args[1]
        }

        stream, group := "test_stream", "test_group"

        // Setup
        conn, _ := net.Dial("tcp", host)
        sendCommand(conn, "DEL", stream)
        sendCommand(conn, "XGROUP", "CREATE", stream, group, "0", "MKSTREAM")
        conn.Close()

        var errors []string
        var mu sync.Mutex
        var wg sync.WaitGroup

        // 5 concurrent blocking readers
        for i := 0; i < 5; i++ {
                wg.Add(1)
                go func(id int) {
                        defer wg.Done()
                        conn, _ := net.Dial("tcp", host)
                        defer conn.Close()

                        for j := 0; j < 100; j++ {
                                cmd := encodeCommand("XREADGROUP", "GROUP", 
group, fmt.Sprintf("reader%d", id),
                                        "COUNT", "1", "BLOCK", "50", "STREAMS", 
stream, ">")
                                conn.Write(cmd)

                                buf := make([]byte, 4096)
                                conn.SetReadDeadline(time.Now().Add(2 * 
time.Second))
                                n, _ := conn.Read(buf)

                                if bytes.Contains(buf[:n], 
[]byte("*-1\r\n*0\r\n")) {
                                        mu.Lock()
                                        errors = append(errors, 
fmt.Sprintf("Reader %d: got *-1\\r\\n*0\\r\\n", id))
                                        mu.Unlock()
                                        fmt.Printf("[BUG] Reader %d: received 
double response *-1\\r\\n*0\\r\\n\n", id)
                                        return
                                }
                        }
                }(i)
        }

        // Writer adding messages concurrently
        go func() {
                conn, _ := net.Dial("tcp", host)
                defer conn.Close()
                for i := 0; i < 200; i++ {
                        sendCommand(conn, "XADD", stream, "*", "msg", 
strconv.Itoa(i))
                        time.Sleep(5 * time.Millisecond)
                }
        }()

        wg.Wait()

        // Cleanup
        conn, _ = net.Dial("tcp", host)
        sendCommand(conn, "DEL", stream)
        conn.Close()

        if len(errors) > 0 {
                fmt.Printf("\nBUG REPRODUCED: %d occurrence(s)\n", len(errors))
                os.Exit(1)
        }
        fmt.Println("No bug detected this run (try again - it's a race 
condition)")
}
```

</details>

The go-redis client detects this and logs:
```
redis: pool.go:768: Conn has unread data (not push notification), removing it
```

I have verified locally that this fix eliminates the bug completely.

## Test

Added a test case `XREADGROUP BLOCK concurrent readers should not
receive double responses` in
`tests/gocase/unit/type/stream/stream_test.go` that exercises concurrent
blocking readers with concurrent writers.

## Type of change

- [x] Bug fix (non-breaking change which fixes an issue)

Resolves #3338

Report URL: https://github.com/apache/kvrocks/actions/runs/20846656216

With regards,
GitHub Actions via GitBox

Reply via email to