This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 54929cd01 fix(replication): fix stuck replication due to wrong 
watermark (#3243)
54929cd01 is described below

commit 54929cd01603f9a19cd3b700b8b4da1683e4eda9
Author: zhixinwen <[email protected]>
AuthorDate: Sat Nov 1 08:31:25 2025 -0700

    fix(replication): fix stuck replication due to wrong watermark (#3243)
    
    In order to reduce unnecessary callbacks, we set read watermark based on
    data size:
    
    
https://github.com/apache/kvrocks/blob/2b7e69a5acd9e1490b96aafc84746d1baef6579e/src/cluster/replication.cc#L700
    
    However, this watermark is not reset after reading the data, which could
    result in stuck replication. For example:
    1. A batch of size 16kb is received by replica and watermark is set to
    16kb
    2. A PING was received after the batch by the replica and the callback
    returns without resetting the watermark. Replica would continue to
    process data only after master sends at least 16kb data.
    3. Master only sent 1kb data, and that data is not processed by replica
    until master sends enough PING to replica to trigger the 16kb watermark.
    This causes the replication to get stuck.
---
 src/cluster/replication.cc                         |  4 ++
 .../integration/replication/replication_test.go    | 43 ++++++++++++++++++++++
 2 files changed, 47 insertions(+)

diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index 993ed9825..21f484208 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -680,6 +680,8 @@ ReplicationThread::CBState 
ReplicationThread::incrementBatchLoopCB(bufferevent *
           if (data_written) {
             sendReplConfAck(bev, force_ack);
           }
+          // We should reset the watermark to 0 to read the next RESP parts 
after reading a batch.
+          bufferevent_setwatermark(bev, EV_READ, 0, 0);
           return CBState::AGAIN;
         }
         incr_bulk_len_ = line.length > 0 ? std::strtoull(line.get() + 1, 
nullptr, 10) : 0;
@@ -714,6 +716,8 @@ ReplicationThread::CBState 
ReplicationThread::incrementBatchLoopCB(bufferevent *
           // when force_ack is false. As a result, if the last write did not 
trigger ack, the replication would not send
           // ack forever and the info command on master would report incorrect 
lag.
           sendReplConfAck(bev, force_ack);
+          // We should reset the watermark to 0 to read the next RESP parts 
after reading a batch.
+          bufferevent_setwatermark(bev, EV_READ, 0, 0);
           return CBState::AGAIN;
         }
 
diff --git a/tests/gocase/integration/replication/replication_test.go 
b/tests/gocase/integration/replication/replication_test.go
index 81fed5683..39ed535ca 100644
--- a/tests/gocase/integration/replication/replication_test.go
+++ b/tests/gocase/integration/replication/replication_test.go
@@ -668,3 +668,46 @@ func TestReplicationGroupSyncConfig(t *testing.T) {
                require.Equal(t, "value2", slaveClient2.Get(ctx, "key2").Val())
        })
 }
+
+func TestReplicationWatermark(t *testing.T) {
+       t.Parallel()
+       master := util.StartServer(t, map[string]string{})
+       defer master.Close()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+
+       slave := util.StartServer(t, map[string]string{})
+       defer slave.Close()
+       slaveClient := slave.NewClient()
+       defer func() { require.NoError(t, slaveClient.Close()) }()
+
+       ctx := context.Background()
+       util.SlaveOf(t, slaveClient, master)
+       util.WaitForSync(t, slaveClient)
+
+       // Send a large SET command to trigger a high watermark in the slave
+       largeValue := strings.Repeat("a", 16*1024) // 16KB value
+       require.NoError(t, masterClient.Set(ctx, "large_key", largeValue, 
0).Err())
+
+       // Wait a bit for the large command to be processed
+       time.Sleep(50 * time.Millisecond)
+
+       // Immediately send a small SET command
+       // Without the fix, this would be delayed due to the high watermark
+       start := time.Now()
+       require.NoError(t, masterClient.Set(ctx, "small_key", "small_value", 
0).Err())
+
+       // Check if the small SET is processed quickly on the slave
+       // The small command should appear within 1 second (much faster than 
the buggy 1 minute delay)
+       require.Eventually(t, func() bool {
+               val, err := slaveClient.Get(ctx, "small_key").Result()
+               if err != nil {
+                       return false
+               }
+               return val == "small_value"
+       }, 1*time.Second, 50*time.Millisecond, "slave should process small 
command quickly after large command")
+
+       duration := time.Since(start)
+       // The small command should be processed much faster than 1 second
+       require.Less(t, duration, 1*time.Second, "small command should be 
processed promptly")
+}

Reply via email to