This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 25216331b fix(stream): add missing return after empty response in 
XREAD/XREADGROUP OnWrite (#3337)
25216331b is described below

commit 25216331bce6f9e97b945aad1cbb04e2db261287
Author: Nick Sweeting <[email protected]>
AuthorDate: Thu Jan 8 22:05:41 2026 -0500

    fix(stream): add missing return after empty response in XREAD/XREADGROUP 
OnWrite (#3337)
    
    When the `OnWrite()` callback for blocking `XREAD`/`XREADGROUP` commands
    receives empty results, it sends a null array response but then
    unconditionally calls `SendReply()`, which sends an additional empty
    array response.
    
    This causes clients to receive two RESP responses (`*-1\r\n*0\r\n`) for
    a single command, leading to protocol desynchronization.
    
    ### Root Cause
    
    **CommandXReadGroup::OnWrite() - lines 1666-1670:**
    ```cpp
    if (results.empty()) {
      conn_->Reply(redis::MultiLen(-1));  // Sends *-1\r\n
      // BUG: missing return here!
    }
    
    SendReply(results);  // Always called, sends *0\r\n when results is empty
    ```
    
    **CommandXRead::OnWrite() - lines 1354-1358:**
    ```cpp
    if (results.empty()) {
      conn_->Reply(conn_->NilArray());  // Sends *-1\r\n
      // BUG: missing return here!
    }
    
    SendReply(results);  // Always called, sends *0\r\n when results is empty
    ```
    
    ### Fix
    
    Add `return;` statements after sending the empty response to prevent the
    duplicate `SendReply()` call.
    
    ## Reproduction
    
    The bug triggers under concurrent load when multiple clients are
    blocking on `XREADGROUP BLOCK` while messages are being added via
    `XADD`.
    
    <details>
    <summary>Reproduction script (Go)</summary>
    
    ```go
    // Build: go build -o repro repro.go
    // Run:   ./repro localhost:6666
    
    package main
    
    import (
            "bytes"
            "fmt"
            "net"
            "os"
            "strconv"
            "sync"
            "time"
    )
    
    var host = "localhost:6666"
    
    func encodeCommand(args ...string) []byte {
            var buf bytes.Buffer
            buf.WriteString(fmt.Sprintf("*%d\r\n", len(args)))
            for _, arg := range args {
                    buf.WriteString(fmt.Sprintf("$%d\r\n%s\r\n", len(arg), arg))
            }
            return buf.Bytes()
    }
    
    func sendCommand(conn net.Conn, args ...string) ([]byte, error) {
            conn.Write(encodeCommand(args...))
            buf := make([]byte, 4096)
            conn.SetReadDeadline(time.Now().Add(5 * time.Second))
            n, _ := conn.Read(buf)
            return buf[:n], nil
    }
    
    func main() {
            if len(os.Args) > 1 {
                    host = os.Args[1]
            }
    
            stream, group := "test_stream", "test_group"
    
            // Setup
            conn, _ := net.Dial("tcp", host)
            sendCommand(conn, "DEL", stream)
            sendCommand(conn, "XGROUP", "CREATE", stream, group, "0", 
"MKSTREAM")
            conn.Close()
    
            var errors []string
            var mu sync.Mutex
            var wg sync.WaitGroup
    
            // 5 concurrent blocking readers
            for i := 0; i < 5; i++ {
                    wg.Add(1)
                    go func(id int) {
                            defer wg.Done()
                            conn, _ := net.Dial("tcp", host)
                            defer conn.Close()
    
                            for j := 0; j < 100; j++ {
                                    cmd := encodeCommand("XREADGROUP", "GROUP", 
group, fmt.Sprintf("reader%d", id),
                                            "COUNT", "1", "BLOCK", "50", 
"STREAMS", stream, ">")
                                    conn.Write(cmd)
    
                                    buf := make([]byte, 4096)
                                    conn.SetReadDeadline(time.Now().Add(2 * 
time.Second))
                                    n, _ := conn.Read(buf)
    
                                    if bytes.Contains(buf[:n], 
[]byte("*-1\r\n*0\r\n")) {
                                            mu.Lock()
                                            errors = append(errors, 
fmt.Sprintf("Reader %d: got *-1\\r\\n*0\\r\\n", id))
                                            mu.Unlock()
                                            fmt.Printf("[BUG] Reader %d: 
received double response *-1\\r\\n*0\\r\\n\n", id)
                                            return
                                    }
                            }
                    }(i)
            }
    
            // Writer adding messages concurrently
            go func() {
                    conn, _ := net.Dial("tcp", host)
                    defer conn.Close()
                    for i := 0; i < 200; i++ {
                            sendCommand(conn, "XADD", stream, "*", "msg", 
strconv.Itoa(i))
                            time.Sleep(5 * time.Millisecond)
                    }
            }()
    
            wg.Wait()
    
            // Cleanup
            conn, _ = net.Dial("tcp", host)
            sendCommand(conn, "DEL", stream)
            conn.Close()
    
            if len(errors) > 0 {
                    fmt.Printf("\nBUG REPRODUCED: %d occurrence(s)\n", 
len(errors))
                    os.Exit(1)
            }
            fmt.Println("No bug detected this run (try again - it's a race 
condition)")
    }
    ```
    
    </details>
    
    The go-redis client detects this and logs:
    ```
    redis: pool.go:768: Conn has unread data (not push notification), removing 
it
    ```
    
    I have verified locally that this fix eliminates the bug completely.
    
    ## Test
    
    Added a test case `XREADGROUP BLOCK concurrent readers should not
    receive double responses` in
    `tests/gocase/unit/type/stream/stream_test.go` that exercises concurrent
    blocking readers with concurrent writers.
    
    ## Type of change
    
    - [x] Bug fix (non-breaking change which fixes an issue)
    
    Resolves #3338
---
 src/commands/cmd_stream.cc                   |  2 +
 tests/gocase/unit/type/stream/stream_test.go | 66 ++++++++++++++++++++++++++++
 2 files changed, 68 insertions(+)

diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index f4c04a21c..233d8475f 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -1353,6 +1353,7 @@ class CommandXRead : public Commander,
 
     if (results.empty()) {
       conn_->Reply(conn_->NilArray());
+      return;
     }
 
     SendReply(results);
@@ -1664,6 +1665,7 @@ class CommandXReadGroup : public Commander,
 
     if (results.empty()) {
       conn_->Reply(redis::MultiLen(-1));
+      return;
     }
 
     SendReply(results);
diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index e97cde87e..4593d3c27 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -2361,6 +2361,72 @@ func TestStreamOffset(t *testing.T) {
                require.Greater(t, pendingEntry.Idle, time.Millisecond)
                require.Less(t, pendingEntry.Idle, 10*time.Second)
        })
+
+       t.Run("XREADGROUP BLOCK concurrent readers should not receive double 
responses", func(t *testing.T) {
+               streamName := "test-concurrent-xreadgroup-block"
+               groupName := "test-group"
+
+               require.NoError(t, rdb.Del(ctx, streamName).Err())
+               require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, 
groupName, "0").Err())
+
+               // Create multiple clients to simulate concurrent blocking 
readers
+               numReaders := 5
+               numIterations := 20
+               clients := make([]*redis.Client, numReaders)
+               for i := 0; i < numReaders; i++ {
+                       clients[i] = srv.NewClient()
+                       defer clients[i].Close()
+               }
+
+               errorCh := make(chan error, numReaders*numIterations)
+               doneCh := make(chan bool, numReaders)
+
+               // Start concurrent blocking readers
+               for i := 0; i < numReaders; i++ {
+                       go func(id int) {
+                               defer func() { doneCh <- true }()
+                               for j := 0; j < numIterations; j++ {
+                                       _, err := clients[id].XReadGroup(ctx, 
&redis.XReadGroupArgs{
+                                               Group:    groupName,
+                                               Consumer: 
fmt.Sprintf("reader%d", id),
+                                               Streams:  []string{streamName, 
">"},
+                                               Count:    1,
+                                               Block:    50 * time.Millisecond,
+                                       }).Result()
+                                       // redis.Nil is expected when block 
times out with no data
+                                       if err != nil && err != redis.Nil {
+                                               errorCh <- err
+                                       }
+                               }
+                       }(i)
+               }
+
+               // Concurrently add messages while readers are blocking
+               go func() {
+                       for i := 0; i < 100; i++ {
+                               rdb.XAdd(ctx, &redis.XAddArgs{
+                                       Stream: streamName,
+                                       Values: []string{"msg", 
strconv.Itoa(i)},
+                               })
+                               time.Sleep(5 * time.Millisecond)
+                       }
+               }()
+
+               // Wait for all readers to complete
+               for i := 0; i < numReaders; i++ {
+                       <-doneCh
+               }
+               close(errorCh)
+
+               // Collect any errors - there should be none
+               var errors []error
+               for err := range errorCh {
+                       errors = append(errors, err)
+               }
+               require.Empty(t, errors, "XREADGROUP BLOCK should not produce 
protocol errors under concurrent load")
+
+               require.NoError(t, rdb.Del(ctx, streamName).Err())
+       })
 }
 
 func parseStreamEntryID(id string) (ts int64, seqNum int64) {

Reply via email to