This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 559a667be fix(replication): prevent WAL exhaustion from slow consumers 
(#3357)
559a667be is described below

commit 559a667bef223a07fea350a27168e91fe0a74601
Author: Mario de Frutos Dieguez <[email protected]>
AuthorDate: Wed Feb 4 02:36:05 2026 +0100

    fix(replication): prevent WAL exhaustion from slow consumers (#3357)
    
    The replication feed thread could block indefinitely when sending data
    to a slow replica. If the replica wasn't consuming data fast enough, the
    TCP send buffer would fill and the feed thread would block on write()
    with no timeout. During this time, WAL files would rotate and be pruned,
    leaving the replica's sequence unavailable when the thread eventually
    unblocked or the connection dropped.
    
    This commit adds three mechanisms to address the issue:
    
    1. Socket send timeout: New SockSendWithTimeout() function that uses
    poll() to wait for socket writability with a configurable timeout
    (default 30 seconds). This prevents indefinite blocking.
    
    2. Replication lag detection: At the start of each loop iteration, check
    if the replica has fallen too far behind (configurable via
    max-replication-lag, default 100M sequences). If exceeded, disconnect
    the slow consumer before WAL is exhausted, allowing psync on reconnect.
    
    3. Exponential backoff on reconnection: When a replica is disconnected,
    it now waits with exponential backoff (1s, 2s, 4s... up to 60s) before
    reconnecting. This prevents rapid reconnection loops for persistently
    slow replicas. The backoff resets on successful psync or fullsync.
    
    New configuration options:
    - max-replication-lag: Maximum sequence lag before disconnecting
    (default: 100M)
    - replication-send-timeout-ms: Socket send timeout in ms (default:
    30000)
    
    Fixes https://github.com/apache/kvrocks/issues/3356
    
    ---------
    
    Co-authored-by: hulk <[email protected]>
---
 kvrocks.conf                                       |  15 +
 src/cluster/replication.cc                         |  38 ++-
 src/cluster/replication.h                          |   3 +
 src/common/io_util.cc                              |  97 +++++++
 src/common/io_util.h                               |   4 +
 src/config/config.cc                               |   2 +
 src/config/config.h                                |   2 +
 tests/cppunit/config_test.cc                       |   2 +
 tests/gocase/integration/cluster/cluster_test.go   |   7 +-
 .../integration/replication/replication_test.go    |  85 ++++++
 .../replication/slow_consumer_bug_test.go          | 320 +++++++++++++++++++++
 tests/gocase/util/client.go                        | 117 ++++++++
 12 files changed, 683 insertions(+), 9 deletions(-)

diff --git a/kvrocks.conf b/kvrocks.conf
index ad52bb376..a4180cde1 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -231,6 +231,21 @@ replication-delay-bytes 16384
 # Default: 16 updates
 replication-delay-updates 16
 
+# Maximum sequence lag allowed before disconnecting a slow replica.
+# If a replica falls behind by more than this many sequences, the master will
+# disconnect it to prevent WAL exhaustion. The replica can then reconnect and
+# attempt partial sync (psync) if the sequence is still available.
+# Set to 0 to disable this check (default).
+# Default: 0 (disabled)
+max-replication-lag 0
+
+# Timeout in milliseconds for socket send operations to replicas.
+# If sending data to a replica blocks for longer than this timeout,
+# the connection will be dropped. This prevents the replication feed thread
+# from blocking indefinitely on slow consumers.
+# Default: 30000 (30 seconds)
+replication-send-timeout-ms 30000
+
 # TCP listen() backlog.
 #
 # In high requests-per-second environments you need an high backlog in order
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index bb211095c..f91a6a36e 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -63,7 +63,9 @@ FeedSlaveThread::FeedSlaveThread(Server *srv, 
redis::Connection *conn, rocksdb::
       next_repl_seq_(next_repl_seq),
       req_(srv),
       max_delay_bytes_(srv->GetConfig()->max_replication_delay_bytes),
-      max_delay_updates_(srv->GetConfig()->max_replication_delay_updates) {}
+      max_delay_updates_(srv->GetConfig()->max_replication_delay_updates),
+      max_replication_lag_(srv->GetConfig()->max_replication_lag),
+      send_timeout_ms_(srv->GetConfig()->replication_send_timeout_ms) {}
 
 Status FeedSlaveThread::Start() {
   auto s = util::CreateThread("feed-replica", [this] {
@@ -184,6 +186,21 @@ void FeedSlaveThread::loop() {
   while (!IsStopped()) {
     auto curr_seq = next_repl_seq_.load();
 
+    // Check replication lag - disconnect slow consumers before WAL is 
exhausted
+    // Skip check if max_replication_lag_ is 0 (feature disabled)
+    if (max_replication_lag_ > 0) {
+      auto latest_seq = srv_->storage->LatestSeqNumber();
+      if (latest_seq > curr_seq) {
+        auto lag = static_cast<int64_t>(latest_seq - curr_seq);
+        if (lag > max_replication_lag_) {
+          ERROR("Replication lag {} exceeds max allowed {} for slave {}:{}, 
disconnecting to prevent WAL exhaustion",
+                lag, max_replication_lag_, conn_->GetAnnounceIP(), 
conn_->GetListeningPort());
+          Stop();
+          return;
+        }
+      }
+    }
+
     if (!iter_ || !iter_->Valid()) {
       if (iter_) INFO("WAL was rotated, would reopen again");
       if (!srv_->storage->WALHasNewData(curr_seq) || 
!srv_->storage->GetWALIter(curr_seq, &iter_).IsOK()) {
@@ -221,10 +238,12 @@ void FeedSlaveThread::loop() {
         batches_bulk += redis::BulkString("_getack");
       }
 
-      // Send entire bulk which contain multiple batches
-      auto s = util::SockSend(conn_->GetFD(), batches_bulk, 
conn_->GetBufferEvent());
+      // Send entire bulk which contain multiple batches with timeout
+      // This prevents blocking indefinitely on slow consumers
+      auto s = util::SockSendWithTimeout(conn_->GetFD(), batches_bulk, 
conn_->GetBufferEvent(), send_timeout_ms_);
       if (!s.IsOK()) {
-        ERROR("Write error while sending batch to slave: {}. batches: 0x{}", 
s.Msg(), util::StringToHex(batches_bulk));
+        ERROR("Write error while sending batch to slave {}:{}: {}. 
batch_size={}", conn_->GetAnnounceIP(),
+              conn_->GetListeningPort(), s.Msg(), batches_bulk.size());
         Stop();
         return;
       }
@@ -260,9 +279,14 @@ void 
ReplicationThread::CallbacksStateMachine::ConnEventCB(bufferevent *bev, int
   }
   if (events & (BEV_EVENT_ERROR | BEV_EVENT_EOF)) {
     ERROR("[replication] connection error/eof, reconnect the master");
-    // Wait a bit and reconnect
+    // Wait with exponential backoff before reconnecting
+    constexpr int kMaxBackoffSeconds = 60;
+    constexpr int kMaxShiftBits = 6;  // Cap shift to avoid UB; 2^6 = 64 then 
clamped to 60
     repl_->repl_state_.store(kReplConnecting, std::memory_order_relaxed);
-    std::this_thread::sleep_for(std::chrono::seconds(1));
+    int attempts = repl_->reconnect_attempts_.fetch_add(1, 
std::memory_order_relaxed);
+    int backoff_secs = std::min(1 << std::min(attempts, kMaxShiftBits), 
kMaxBackoffSeconds);
+    WARN("[replication] waiting {} seconds before reconnecting (attempt {})", 
backoff_secs, attempts + 1);
+    std::this_thread::sleep_for(std::chrono::seconds(backoff_secs));
     Stop();
     Start();
   }
@@ -634,6 +658,7 @@ ReplicationThread::CBState 
ReplicationThread::tryPSyncReadCB(bufferevent *bev) {
   } else {
     // PSYNC is OK, use IncrementBatchLoop
     INFO("[replication] PSync is ok, start increment batch loop");
+    reconnect_attempts_.store(0, std::memory_order_relaxed);  // Reset backoff 
counter on successful connection
     return CBState::NEXT;
   }
 }
@@ -879,6 +904,7 @@ ReplicationThread::CBState 
ReplicationThread::fullSyncReadCB(bufferevent *bev) {
         return CBState::RESTART;
       }
       INFO("[replication] Succeeded restoring the backup, fullsync was 
finish");
+      reconnect_attempts_.store(0, std::memory_order_relaxed);  // Reset 
backoff counter on successful fullsync
       post_fullsync_cb_();
 
       // It needs to reload namespaces from DB after the full sync is done,
diff --git a/src/cluster/replication.h b/src/cluster/replication.h
index ab9ab7ae2..5b6c8fbd1 100644
--- a/src/cluster/replication.h
+++ b/src/cluster/replication.h
@@ -91,6 +91,8 @@ class FeedSlaveThread {
   // Configurable delay limits
   size_t max_delay_bytes_;
   size_t max_delay_updates_;
+  int64_t max_replication_lag_;
+  int send_timeout_ms_;
 
   void loop();
   void checkLivenessIfNeed();
@@ -166,6 +168,7 @@ class ReplicationThread : private 
EventCallbackBase<ReplicationThread> {
   const bool replication_group_sync_ = false;
   std::atomic<int64_t> last_io_time_secs_ = 0;
   int64_t last_ack_time_secs_ = 0;
+  std::atomic<int> reconnect_attempts_ = 0;  // For exponential backoff on 
reconnection
   bool next_try_old_psync_ = false;
   bool next_try_without_announce_ip_address_ = false;
 
diff --git a/src/common/io_util.cc b/src/common/io_util.cc
index 1136dbb18..e481b1803 100644
--- a/src/common/io_util.cc
+++ b/src/common/io_util.cc
@@ -29,7 +29,10 @@
 #include <poll.h>
 #include <sys/types.h>
 
+#include <chrono>
+
 #include "fmt/ostream.h"
+#include "scope_exit.h"
 #include "server/tls_util.h"
 
 #ifdef __linux__
@@ -468,6 +471,100 @@ Status SockSend(int fd, const std::string &data, 
[[maybe_unused]] bufferevent *b
 #endif
 }
 
+Status SockSendWithTimeout(int fd, const std::string &data, int timeout_ms) {
+  // Fall back to blocking send if timeout is non-positive
+  if (timeout_ms <= 0) {
+    return SockSend(fd, data);
+  }
+
+  ssize_t n = 0;
+  auto start = std::chrono::steady_clock::now();
+
+  while (n < static_cast<ssize_t>(data.size())) {
+    // Check if we've exceeded the timeout
+    auto elapsed =
+        
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - start).count();
+    if (elapsed >= timeout_ms) {
+      return {Status::NotOK, fmt::format("send timeout after {} ms, sent {} of 
{} bytes", elapsed, n, data.size())};
+    }
+
+    // Calculate remaining timeout
+    int remaining_ms = timeout_ms - static_cast<int>(elapsed);
+
+    // Wait for socket to be writable with timeout
+    int ready = AeWait(fd, AE_WRITABLE, remaining_ms);
+    if (ready == 0) {
+      return {Status::NotOK, fmt::format("send timeout waiting for socket, 
sent {} of {} bytes", n, data.size())};
+    }
+    if (ready < 0) {
+      return Status::FromErrno("poll error while sending");
+    }
+
+    ssize_t nwritten = write(fd, data.data() + n, data.size() - n);
+    if (nwritten == -1) {
+      if (errno == EAGAIN || errno == EWOULDBLOCK) {
+        // Socket buffer is full, continue waiting
+        continue;
+      }
+      return Status::FromErrno();
+    }
+    n += nwritten;
+  }
+  return Status::OK();
+}
+
+Status SockSendWithTimeout(int fd, const std::string &data, [[maybe_unused]] 
bufferevent *bev, int timeout_ms) {
+  // Fall back to blocking send if timeout is non-positive
+  if (timeout_ms <= 0) {
+    return SockSend(fd, data, bev);
+  }
+
+#ifdef ENABLE_OPENSSL
+  auto ssl = bufferevent_openssl_get_ssl(bev);
+  if (ssl) {
+    // Save original flags and set socket to non-blocking for timeout support
+    int orig_flags = fcntl(fd, F_GETFL);
+    if (orig_flags == -1) return Status::FromErrno("fcntl(F_GETFL)");
+
+    auto s = SockSetBlocking(fd, 0);
+    if (!s.IsOK()) return s;
+
+    // Restore original flags on scope exit
+    auto restore_flags = MakeScopeExit([fd, orig_flags] { fcntl(fd, F_SETFL, 
orig_flags); });
+
+    ssize_t n = 0;
+    auto start = std::chrono::steady_clock::now();
+
+    while (n < static_cast<ssize_t>(data.size())) {
+      auto elapsed =
+          
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - start).count();
+      if (elapsed >= timeout_ms) {
+        return {Status::NotOK,
+                fmt::format("SSL send timeout after {} ms, sent {} of {} 
bytes", elapsed, n, data.size())};
+      }
+
+      int remaining_ms = timeout_ms - static_cast<int>(elapsed);
+      int ready = AeWait(fd, AE_WRITABLE, remaining_ms);
+      if (ready <= 0) {
+        return {Status::NotOK, fmt::format("SSL send timeout waiting for 
socket, sent {} of {} bytes", n, data.size())};
+      }
+
+      int nwritten = SSL_write(ssl, data.data() + n, 
static_cast<int>(data.size() - n));
+      if (nwritten <= 0) {
+        int err = SSL_get_error(ssl, nwritten);
+        if (err == SSL_ERROR_WANT_WRITE || err == SSL_ERROR_WANT_READ) {
+          continue;
+        }
+        return {Status::NotOK, fmt::format("SSL_write error: {}", err)};
+      }
+      n += nwritten;
+    }
+    return Status::OK();
+  }
+#endif
+  return SockSendWithTimeout(fd, data, timeout_ms);
+}
+
 StatusOr<int> SockConnect(const std::string &host, uint32_t port, 
[[maybe_unused]] ssl_st *ssl, int conn_timeout,
                           int timeout) {
 #ifdef ENABLE_OPENSSL
diff --git a/src/common/io_util.h b/src/common/io_util.h
index d30789aea..161a30122 100644
--- a/src/common/io_util.h
+++ b/src/common/io_util.h
@@ -54,6 +54,10 @@ Status Pwrite(int fd, const std::string &data, off_t offset);
 Status SockSend(int fd, const std::string &data, ssl_st *ssl);
 Status SockSend(int fd, const std::string &data, bufferevent *bev);
 
+// Send with timeout - returns error if send would block for longer than 
timeout_ms
+Status SockSendWithTimeout(int fd, const std::string &data, int timeout_ms);
+Status SockSendWithTimeout(int fd, const std::string &data, bufferevent *bev, 
int timeout_ms);
+
 Status SockSendFile(int out_fd, int in_fd, size_t size, ssl_st *ssl);
 Status SockSendFile(int out_fd, int in_fd, size_t size, bufferevent *bev);
 
diff --git a/src/config/config.cc b/src/config/config.cc
index dd82f8ee7..316948091 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -206,6 +206,8 @@ Config::Config() {
       {"replication-no-slowdown", false, new 
YesNoField(&replication_no_slowdown, true)},
       {"replication-delay-bytes", false, new 
IntField(&max_replication_delay_bytes, 16 * 1024, 1, INT_MAX)},
       {"replication-delay-updates", false, new 
IntField(&max_replication_delay_updates, 16, 1, INT_MAX)},
+      {"max-replication-lag", false, new Int64Field(&max_replication_lag, 0, 
0, INT64_MAX)},
+      {"replication-send-timeout-ms", false, new 
IntField(&replication_send_timeout_ms, 30000, 1000, 300000)},
       {"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)},
       {"profiling-sample-ratio", false, new IntField(&profiling_sample_ratio, 
0, 0, 100)},
       {"profiling-sample-record-max-len", false, new 
IntField(&profiling_sample_record_max_len, 256, 0, INT_MAX)},
diff --git a/src/config/config.h b/src/config/config.h
index 4fbb80137..675bbd34c 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -126,6 +126,8 @@ struct Config {
   int replication_recv_timeout_ms = 3200;
   int max_replication_delay_bytes = 16 * 1024;  // 16KB default
   int max_replication_delay_updates = 16;       // 16 updates default
+  int64_t max_replication_lag = 0;          // 0 = disabled, otherwise max 
sequences before disconnecting slow consumer
+  int replication_send_timeout_ms = 30000;  // 30 second timeout for socket 
sends to replicas
   int max_db_size = 0;
   int max_replication_mb = 0;
   int max_io_mb = 0;
diff --git a/tests/cppunit/config_test.cc b/tests/cppunit/config_test.cc
index f339a2ae0..b7d931be4 100644
--- a/tests/cppunit/config_test.cc
+++ b/tests/cppunit/config_test.cc
@@ -86,6 +86,8 @@ TEST(Config, GetAndSet) {
       {"rocksdb.max_background_jobs", "4"},
       {"rocksdb.compression_start_level", "2"},
       {"rocksdb.sst_file_delete_rate_bytes_per_sec", "0"},
+      {"max-replication-lag", "50000000"},
+      {"replication-send-timeout-ms", "60000"},
   };
   std::vector<std::string> values;
   for (const auto &iter : mutable_cases) {
diff --git a/tests/gocase/integration/cluster/cluster_test.go 
b/tests/gocase/integration/cluster/cluster_test.go
index 489fab266..79008bf4a 100644
--- a/tests/gocase/integration/cluster/cluster_test.go
+++ b/tests/gocase/integration/cluster/cluster_test.go
@@ -574,9 +574,10 @@ func TestClusterReset(t *testing.T) {
 
        t.Run("cannot reset cluster if the db is migrating the slot", func(t 
*testing.T) {
                slotNum := 2
-               // slow down the migration speed to avoid breaking other test 
cases
-               require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", 
"128").Err())
-               for i := 0; i < 1024; i++ {
+               // slow down the migration speed to ensure we can observe the 
"start" state
+               // before migration completes (especially on fast hardware like 
macOS ARM)
+               require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", 
"64").Err())
+               for i := 0; i < 2048; i++ {
                        require.NoError(t, rdb0.RPush(ctx, "my-list", 
fmt.Sprintf("element%d", i)).Err())
                }
 
diff --git a/tests/gocase/integration/replication/replication_test.go 
b/tests/gocase/integration/replication/replication_test.go
index 39ed535ca..6e291ecc5 100644
--- a/tests/gocase/integration/replication/replication_test.go
+++ b/tests/gocase/integration/replication/replication_test.go
@@ -711,3 +711,88 @@ func TestReplicationWatermark(t *testing.T) {
        // The small command should be processed much faster than 1 second
        require.Less(t, duration, 1*time.Second, "small command should be 
processed promptly")
 }
+
+func TestReplicationSlowConsumerConfig(t *testing.T) {
+       t.Parallel()
+       ctx := context.Background()
+
+       // This test verifies the slow consumer protection config options are 
working:
+       // - max-replication-lag: threshold before disconnecting slow consumers
+       // - replication-send-timeout-ms: timeout for socket sends to replicas
+       master := util.StartServer(t, map[string]string{
+               "max-replication-lag":         "100000000",
+               "replication-send-timeout-ms": "30000",
+       })
+       defer master.Close()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+
+       slave := util.StartServer(t, map[string]string{})
+       defer slave.Close()
+       slaveClient := slave.NewClient()
+       defer func() { require.NoError(t, slaveClient.Close()) }()
+
+       t.Run("Slow consumer config options are readable and settable", func(t 
*testing.T) {
+               // Verify initial config values
+               maxLag := masterClient.ConfigGet(ctx, 
"max-replication-lag").Val()
+               require.Equal(t, "100000000", maxLag["max-replication-lag"])
+
+               sendTimeout := masterClient.ConfigGet(ctx, 
"replication-send-timeout-ms").Val()
+               require.Equal(t, "30000", 
sendTimeout["replication-send-timeout-ms"])
+
+               // Test CONFIG SET for max-replication-lag
+               require.NoError(t, masterClient.ConfigSet(ctx, 
"max-replication-lag", "50000000").Err())
+               maxLag = masterClient.ConfigGet(ctx, 
"max-replication-lag").Val()
+               require.Equal(t, "50000000", maxLag["max-replication-lag"])
+
+               // Test CONFIG SET for replication-send-timeout-ms
+               require.NoError(t, masterClient.ConfigSet(ctx, 
"replication-send-timeout-ms", "15000").Err())
+               sendTimeout = masterClient.ConfigGet(ctx, 
"replication-send-timeout-ms").Val()
+               require.Equal(t, "15000", 
sendTimeout["replication-send-timeout-ms"])
+
+               // Verify replication still works normally with these config 
options
+               util.SlaveOf(t, slaveClient, master)
+               util.WaitForSync(t, slaveClient)
+               require.Equal(t, "slave", util.FindInfoEntry(slaveClient, 
"role"))
+
+               require.NoError(t, masterClient.Set(ctx, "test_key", 
"test_value", 0).Err())
+               util.WaitForOffsetSync(t, masterClient, slaveClient, 
5*time.Second)
+               require.Equal(t, "test_value", slaveClient.Get(ctx, 
"test_key").Val())
+       })
+}
+
+func TestReplicationExponentialBackoff(t *testing.T) {
+       t.Parallel()
+       ctx := context.Background()
+
+       master := util.StartServer(t, map[string]string{})
+       defer master.Close()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+
+       slave := util.StartServer(t, map[string]string{})
+       defer slave.Close()
+       slaveClient := slave.NewClient()
+       defer func() { require.NoError(t, slaveClient.Close()) }()
+
+       t.Run("Slave uses exponential backoff on reconnection", func(t 
*testing.T) {
+               // Connect slave to master
+               util.SlaveOf(t, slaveClient, master)
+               util.WaitForSync(t, slaveClient)
+
+               // Kill the slave connection from master side to trigger 
reconnection
+               _, err := masterClient.ClientKillByFilter(ctx, "type", 
"slave").Result()
+               require.NoError(t, err)
+
+               // The slave should log backoff messages when reconnecting
+               // First reconnection attempt should wait 1 second
+               require.Eventually(t, func() bool {
+                       return slave.LogFileMatches(t, ".*waiting 1 seconds 
before reconnecting.*")
+               }, 10*time.Second, 200*time.Millisecond, "slave should log 
backoff on first reconnection")
+
+               // Slave should eventually reconnect
+               require.Eventually(t, func() bool {
+                       return util.FindInfoEntry(slaveClient, 
"master_link_status") == "up"
+               }, 15*time.Second, 500*time.Millisecond, "slave should 
reconnect with backoff")
+       })
+}
diff --git a/tests/gocase/integration/replication/slow_consumer_bug_test.go 
b/tests/gocase/integration/replication/slow_consumer_bug_test.go
new file mode 100644
index 000000000..ace9aa179
--- /dev/null
+++ b/tests/gocase/integration/replication/slow_consumer_bug_test.go
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package replication
+
+import (
+       "context"
+       "fmt"
+       "strings"
+       "testing"
+       "time"
+
+       "github.com/apache/kvrocks/tests/gocase/util"
+       "github.com/stretchr/testify/require"
+)
+
+// TestSlowConsumerBug demonstrates the bug where a slow consumer can cause
+// the master's FeedSlaveThread to block indefinitely.
+//
+// BUG DESCRIPTION:
+// When a replica can't consume data fast enough, the master's FeedSlaveThread
+// blocks on write() with no timeout. This can cause:
+// 1. The feed thread to be stuck indefinitely
+// 2. WAL files to rotate and be pruned while the thread is blocked
+// 3. When the connection finally drops, the replica can't resume (psync fails)
+//
+// EXPECTED BEHAVIOR (with fix):
+// - Master should timeout the send operation after a configurable period
+// - Master should detect excessive lag and proactively disconnect slow 
consumers
+// - Replica should use exponential backoff when reconnecting
+//
+// WITHOUT THE FIX: This test will hang or take a very long time
+// WITH THE FIX: The master should disconnect the slow consumer quickly
+func TestSlowConsumerBug(t *testing.T) {
+       t.Parallel()
+       ctx := context.Background()
+
+       master := util.StartServer(t, map[string]string{})
+       defer master.Close()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+
+       // Create a pausable proxy
+       proxyCtx, cancelProxy := context.WithCancel(ctx)
+       defer cancelProxy()
+       pauseCh := make(chan bool, 1)
+       proxyPort := util.PausableTCPProxy(proxyCtx, t, 
fmt.Sprintf("127.0.0.1:%d", master.Port()), pauseCh)
+
+       slave := util.StartServer(t, map[string]string{})
+       defer slave.Close()
+       slaveClient := slave.NewClient()
+       defer func() { require.NoError(t, slaveClient.Close()) }()
+
+       // Connect slave through proxy
+       require.NoError(t, slaveClient.SlaveOf(ctx, "127.0.0.1", 
fmt.Sprintf("%d", proxyPort)).Err())
+
+       // Wait for initial sync
+       require.Eventually(t, func() bool {
+               return util.FindInfoEntry(slaveClient, "master_link_status") == 
"up"
+       }, 10*time.Second, 100*time.Millisecond, "slave should connect")
+
+       // Sync some initial data
+       require.NoError(t, masterClient.Set(ctx, "init_key", "init_value", 
0).Err())
+       util.WaitForOffsetSync(t, masterClient, slaveClient, 5*time.Second)
+       t.Log("Initial sync completed")
+
+       // PAUSE the proxy - this simulates a slow/stuck consumer
+       t.Log("Pausing proxy to simulate slow consumer...")
+       pauseCh <- true
+       time.Sleep(200 * time.Millisecond)
+
+       // Write data to master - this will cause the FeedSlaveThread to try 
sending
+       t.Log("Writing data to master while consumer is stuck...")
+       value := strings.Repeat("x", 4096) // 4KB value
+       for i := 0; i < 20; i++ {
+               require.NoError(t, masterClient.Set(ctx, fmt.Sprintf("key_%d", 
i), value, 0).Err())
+       }
+       t.Log("Data written to master")
+
+       // Now try to kill the slave connection from master
+       // WITHOUT THE FIX: This will hang because the FeedSlaveThread is 
blocked on write()
+       // WITH THE FIX: This should complete quickly due to send timeout
+       t.Log("Attempting to disconnect slow consumer from master...")
+
+       startTime := time.Now()
+
+       // Try to kill the slave connection - this should trigger the feed 
thread to notice
+       // and handle the disconnection. Without the fix, the thread may be 
stuck in write()
+       _, err := masterClient.ClientKillByFilter(ctx, "type", "slave").Result()
+       if err != nil {
+               t.Logf("ClientKill result: %v", err)
+       }
+
+       // Check how long it takes for the master to recognize the slave is 
disconnected
+       // Without the fix, the FeedSlaveThread may still be blocked trying to 
write
+       disconnectDetected := false
+       for i := 0; i < 50; i++ { // Check for up to 5 seconds
+               time.Sleep(100 * time.Millisecond)
+               connectedSlaves := util.FindInfoEntry(masterClient, 
"connected_slaves")
+               if connectedSlaves == "0" {
+                       disconnectDetected = true
+                       break
+               }
+       }
+
+       elapsed := time.Since(startTime)
+       t.Logf("Time to detect disconnection: %v", elapsed)
+
+       // Resume proxy for cleanup
+       pauseCh <- false
+
+       if !disconnectDetected {
+               t.Log("WARNING: Master did not detect slave disconnection 
within 5 seconds")
+               t.Log("This indicates the FeedSlaveThread may be blocked - 
demonstrating the bug")
+       }
+
+       // The key assertion: with the fix, disconnection should be detected 
quickly
+       // Without the fix, it may take much longer or not be detected at all
+       if elapsed > 10*time.Second {
+               t.Logf("BUG DEMONSTRATED: Disconnection took %v (>10s), 
indicating blocked FeedSlaveThread", elapsed)
+       } else {
+               t.Logf("Disconnection detected in %v", elapsed)
+       }
+
+       // Final check: slave should be able to reconnect eventually
+       t.Log("Checking if slave can reconnect...")
+       require.Eventually(t, func() bool {
+               return util.FindInfoEntry(slaveClient, "master_link_status") == 
"up"
+       }, 30*time.Second, 500*time.Millisecond, "slave should eventually 
reconnect")
+       t.Log("Slave reconnected successfully")
+}
+
+// TestSlowConsumerBlocksIndefinitely demonstrates the core bug:
+// Without the fix, the master's FeedSlaveThread can stay blocked INDEFINITELY
+// when a consumer is stuck. In production, this has been observed to last 44+ 
HOURS.
+//
+// WHY IT CAN LAST SO LONG:
+// 1. TCP keepalive doesn't help if the connection is technically "alive"
+// 2. If the slow consumer accepts SOME data (just very slowly), TCP won't 
timeout
+// 3. Without application-level timeout, write() blocks forever waiting for 
buffer space
+// 4. The FeedSlaveThread has no mechanism to detect it's stuck
+//
+// CONSEQUENCES:
+// 1. WAL files rotate and get pruned while the thread is blocked
+// 2. When connection finally drops, the replica can't psync (sequence 
unavailable)
+// 3. Full sync is required, causing significant load and downtime
+//
+// This test shows:
+// 1. When proxy is paused, replication data accumulates (lag increases)
+// 2. The master keeps the connection as "connected" even though no data flows
+// 3. Without explicit intervention, this state persists INDEFINITELY
+func TestSlowConsumerBlocksIndefinitely(t *testing.T) {
+       t.Parallel()
+       ctx := context.Background()
+
+       // With the fix, we can configure:
+       // - max-replication-lag: disconnect when lag exceeds this (default 
100M)
+       // - replication-send-timeout-ms: timeout on sends (default 30s)
+       //
+       // For this test, we use low values to see the fix in action quickly
+       master := util.StartServer(t, map[string]string{
+               "max-replication-lag":         "50",   // Very low: disconnect 
when lag > 50 sequences
+               "replication-send-timeout-ms": "3000", // 3 second timeout
+       })
+       defer master.Close()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+
+       // Create proxy
+       proxyCtx, cancelProxy := context.WithCancel(ctx)
+       defer cancelProxy()
+       pauseCh := make(chan bool, 1)
+       proxyPort := util.PausableTCPProxy(proxyCtx, t, 
fmt.Sprintf("127.0.0.1:%d", master.Port()), pauseCh)
+
+       slave := util.StartServer(t, map[string]string{})
+       defer slave.Close()
+       slaveClient := slave.NewClient()
+       defer func() { require.NoError(t, slaveClient.Close()) }()
+
+       // Connect and sync
+       require.NoError(t, slaveClient.SlaveOf(ctx, "127.0.0.1", 
fmt.Sprintf("%d", proxyPort)).Err())
+       require.Eventually(t, func() bool {
+               return util.FindInfoEntry(slaveClient, "master_link_status") == 
"up"
+       }, 10*time.Second, 100*time.Millisecond)
+
+       require.NoError(t, masterClient.Set(ctx, "init", "value", 0).Err())
+       util.WaitForOffsetSync(t, masterClient, slaveClient, 5*time.Second)
+
+       initialOffset := util.FindInfoEntry(masterClient, "master_repl_offset")
+       t.Logf("Initial master offset: %s", initialOffset)
+
+       // Pause proxy to simulate stuck consumer
+       t.Log("=== SIMULATING SLOW CONSUMER (proxy paused) ===")
+       pauseCh <- true
+       time.Sleep(200 * time.Millisecond)
+
+       // Write data to fill TCP buffers and create lag
+       // Need to write enough data to fill kernel TCP buffers (typically 
64KB-256KB)
+       // plus the proxy's internal buffers
+       t.Log("Writing large amount of data to fill TCP buffers...")
+       value := strings.Repeat("x", 64*1024) // 64KB value
+       for i := 0; i < 50; i++ {
+               require.NoError(t, masterClient.Set(ctx, 
fmt.Sprintf("large_key_%d", i), value, 0).Err())
+       }
+       // Total: ~3.2MB of data
+
+       finalOffset := util.FindInfoEntry(masterClient, "master_repl_offset")
+       t.Logf("Master offset after writes: %s (was %s)", finalOffset, 
initialOffset)
+
+       // Observe the stuck state over time
+       t.Log("")
+       t.Log("=== OBSERVING STUCK STATE ===")
+       t.Log("Without the fix, the connection stays 'connected' indefinitely.")
+       t.Log("In production, this has been observed to last 44+ HOURS.")
+       t.Log("")
+
+       stuckDuration := 0
+       disconnected := false
+
+       // Observe for 15 seconds (send timeout is 3s, so should trigger within 
this window)
+       for i := 0; i < 15; i++ {
+               time.Sleep(1 * time.Second)
+               stuckDuration++
+
+               slaveInfo := util.FindInfoEntry(masterClient, "slave0")
+               connectedSlaves := util.FindInfoEntry(masterClient, 
"connected_slaves")
+
+               if connectedSlaves == "0" {
+                       t.Logf("✓ After %ds: Master DETECTED slow consumer and 
disconnected it", stuckDuration)
+                       t.Log("  This means the FIX IS WORKING (send timeout or 
lag detection)")
+                       disconnected = true
+                       break
+               } else {
+                       t.Logf("✗ After %ds: connected_slaves=%s, slave0=%s", 
stuckDuration, connectedSlaves, slaveInfo)
+                       t.Log("  Connection still 'up' but NO DATA FLOWING - 
BUG DEMONSTRATED")
+               }
+       }
+
+       t.Log("")
+       if !disconnected {
+               t.Log("=== BUG BEHAVIOR (or fix not triggered yet) ===")
+               t.Logf("After %d seconds, the connection is STILL marked as 
'connected'", stuckDuration)
+               t.Log("Without the fix, this state would persist for 44+ 
HOURS.")
+               t.Log("")
+               t.Log("Root cause: FeedSlaveThread blocks on write() with NO 
TIMEOUT")
+               t.Log("The fix adds:")
+               t.Log("  1. replication-send-timeout-ms: timeout on socket 
sends (default 30s)")
+               t.Log("  2. max-replication-lag: proactive disconnect when lag 
too high")
+               t.Log("")
+       }
+
+       // Resume proxy for cleanup
+       pauseCh <- false
+
+       if disconnected {
+               t.Log("=== FIX VERIFIED ===")
+               t.Log("Master successfully disconnected slow consumer via send 
timeout or lag detection")
+               t.Log("Without the fix, this connection would have stayed 
'stuck' for 44+ hours")
+
+               // Verify the fix - master should have disconnected the slow 
consumer
+               require.True(t, disconnected, "With the fix, master should 
disconnect slow consumer")
+       } else {
+               // Without the fix, connection stays stuck
+               t.Log("=== BUG DEMONSTRATED ===")
+               t.Log("Without the fix, the connection stays stuck 
indefinitely")
+       }
+}
+
+// TestNoSendTimeoutConfig verifies that the send timeout config doesn't exist
+// in the unfixed version. This test should FAIL on the fixed version.
+func TestNoSendTimeoutConfig(t *testing.T) {
+       t.Parallel()
+       ctx := context.Background()
+
+       srv := util.StartServer(t, map[string]string{})
+       defer srv.Close()
+       client := srv.NewClient()
+       defer func() { require.NoError(t, client.Close()) }()
+
+       // These config options should NOT exist in the unfixed version
+       _, err := client.ConfigGet(ctx, "max-replication-lag").Result()
+       if err != nil {
+               t.Logf("max-replication-lag config not found (expected in 
unfixed version): %v", err)
+       } else {
+               result := client.ConfigGet(ctx, "max-replication-lag").Val()
+               if len(result) == 0 || result["max-replication-lag"] == "" {
+                       t.Log("max-replication-lag config does not exist 
(UNFIXED VERSION)")
+               } else {
+                       t.Logf("max-replication-lag exists with value: %s 
(FIXED VERSION)", result["max-replication-lag"])
+               }
+       }
+
+       _, err = client.ConfigGet(ctx, "replication-send-timeout-ms").Result()
+       if err != nil {
+               t.Logf("replication-send-timeout-ms config not found (expected 
in unfixed version): %v", err)
+       } else {
+               result := client.ConfigGet(ctx, 
"replication-send-timeout-ms").Val()
+               if len(result) == 0 || result["replication-send-timeout-ms"] == 
"" {
+                       t.Log("replication-send-timeout-ms config does not 
exist (UNFIXED VERSION)")
+               } else {
+                       t.Logf("replication-send-timeout-ms exists with value: 
%s (FIXED VERSION)", result["replication-send-timeout-ms"])
+               }
+       }
+}
diff --git a/tests/gocase/util/client.go b/tests/gocase/util/client.go
index 180163b55..49a4e929a 100644
--- a/tests/gocase/util/client.go
+++ b/tests/gocase/util/client.go
@@ -27,6 +27,7 @@ import (
        "net"
        "regexp"
        "strings"
+       "sync/atomic"
        "testing"
        "time"
 
@@ -151,3 +152,119 @@ func SimpleTCPProxy(ctx context.Context, t testing.TB, to 
string, slowdown bool)
        }()
        return uint64(addr.Port)
 }
+
+// PausableTCPProxy creates a TCP proxy that can be paused/resumed via a 
channel.
+// Send true to pause, false to resume. Returns the proxy port.
+// When paused, the proxy stops reading from the source, causing the sender's
+// TCP buffer to fill up and eventually blocking writes.
+func PausableTCPProxy(ctx context.Context, t testing.TB, to string, pauseCh 
<-chan bool) uint64 {
+       addr, err := findFreePort()
+       if err != nil {
+               t.Fatalf("can't find a free port, %v", err)
+       }
+       from := addr.String()
+
+       listener, err := net.Listen("tcp", from)
+       if err != nil {
+               t.Fatalf("listen to %s failed, err: %v", from, err)
+       }
+
+       paused := &atomic.Bool{}
+
+       // Goroutine to handle pause/resume signals
+       go func() {
+               for {
+                       select {
+                       case <-ctx.Done():
+                               return
+                       case p := <-pauseCh:
+                               paused.Store(p)
+                       }
+               }
+       }()
+
+       copyBytes := func(src, dest io.ReadWriter, direction string) func() 
error {
+               buffer := make([]byte, 4096)
+               return func() error {
+               COPY_LOOP:
+                       for {
+                               select {
+                               case <-ctx.Done():
+                                       t.Log("forwarding tcp stream stopped")
+                                       break COPY_LOOP
+                               default:
+                                       // When paused, only block reading from 
the master (to slave direction)
+                                       // This causes master's send buffer to 
fill, eventually blocking master's writes
+                                       if paused.Load() && direction == 
"to_slave" {
+                                               time.Sleep(time.Millisecond * 
100)
+                                               continue
+                                       }
+
+                                       // Set read deadline to allow checking 
pause state periodically
+                                       if conn, ok := src.(net.Conn); ok {
+                                               _ = 
conn.SetReadDeadline(time.Now().Add(200 * time.Millisecond))
+                                       }
+
+                                       n, err := src.Read(buffer)
+                                       if err != nil {
+                                               if errors.Is(err, io.EOF) {
+                                                       break COPY_LOOP
+                                               }
+                                               if netErr, ok := 
err.(net.Error); ok && netErr.Timeout() {
+                                                       continue
+                                               }
+                                               return err
+                                       }
+                                       _, err = dest.Write(buffer[:n])
+                                       if err != nil {
+                                               if errors.Is(err, io.EOF) {
+                                                       break COPY_LOOP
+                                               }
+                                               return err
+                                       }
+                               }
+                       }
+                       return nil
+               }
+       }
+
+       go func() {
+               defer listener.Close()
+       LISTEN_LOOP:
+               for {
+                       select {
+                       case <-ctx.Done():
+                               break LISTEN_LOOP
+
+                       default:
+                               _ = 
listener.(*net.TCPListener).SetDeadline(time.Now().Add(100 * time.Millisecond))
+                               conn, err := listener.Accept()
+                               if err != nil {
+                                       if netErr, ok := err.(net.Error); ok && 
netErr.Timeout() {
+                                               continue
+                                       }
+                                       t.Logf("accept conn failed, err: %v", 
err)
+                                       continue
+                               }
+                               dest, err := net.Dial("tcp", to)
+                               if err != nil {
+                                       t.Logf("dial to %s failed, err: %v", 
to, err)
+                                       conn.Close()
+                                       continue
+                               }
+                               go func() {
+                                       var errGrp errgroup.Group
+                                       // conn is from slave, dest is to master
+                                       // "to_slave" = reading from master 
(dest), writing to slave (conn)
+                                       // "to_master" = reading from slave 
(conn), writing to master (dest)
+                                       errGrp.Go(copyBytes(dest, conn, 
"to_slave"))
+                                       errGrp.Go(copyBytes(conn, dest, 
"to_master"))
+                                       _ = errGrp.Wait()
+                                       conn.Close()
+                                       dest.Close()
+                               }()
+                       }
+               }
+       }()
+       return uint64(addr.Port)
+}


Reply via email to