This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 2836bf02 chore(tests): add basic test case for the full+partial sync 
(#2522)
2836bf02 is described below

commit 2836bf02877fa9853af6047dc35b206dea7330f6
Author: hulk <[email protected]>
AuthorDate: Fri Sep 6 12:08:44 2024 +0800

    chore(tests): add basic test case for the full+partial sync (#2522)
---
 tests/gocase/integration/cluster/cluster_test.go   |  4 +-
 .../integration/replication/replication_test.go    | 50 ++++++++++++++++++++--
 tests/gocase/integration/rsid/rsid_test.go         |  2 +-
 tests/gocase/tls/tls_test.go                       |  4 +-
 tests/gocase/unit/namespace/namespace_test.go      |  7 +--
 tests/gocase/unit/scripting/scripting_test.go      |  5 ++-
 tests/gocase/util/client.go                        |  4 +-
 7 files changed, 61 insertions(+), 15 deletions(-)

diff --git a/tests/gocase/integration/cluster/cluster_test.go 
b/tests/gocase/integration/cluster/cluster_test.go
index 541c2a99..3c75e9ff 100644
--- a/tests/gocase/integration/cluster/cluster_test.go
+++ b/tests/gocase/integration/cluster/cluster_test.go
@@ -399,7 +399,7 @@ func TestClusterMultiple(t *testing.T) {
                // request replicas a write command, it's wrong
                require.ErrorContains(t, rdb[3].Set(ctx, util.SlotTable[16383], 
16383, 0).Err(), "MOVED")
                // request a read-only command to node3 that serve slot 16383, 
that's ok
-               util.WaitForOffsetSync(t, rdb[2], rdb[3])
+               util.WaitForOffsetSync(t, rdb[2], rdb[3], 5*time.Second)
                //the default option is READWRITE, which will redirect both 
read and write to master
                require.ErrorContains(t, rdb[3].Get(ctx, 
util.SlotTable[16383]).Err(), "MOVED")
 
@@ -445,7 +445,7 @@ func TestClusterMultiple(t *testing.T) {
 
                require.NoError(t, rdb[3].Do(ctx, "READONLY").Err())
                require.NoError(t, rdb[2].Set(ctx, util.SlotTable[8192], 8192, 
0).Err())
-               util.WaitForOffsetSync(t, rdb[2], rdb[3])
+               util.WaitForOffsetSync(t, rdb[2], rdb[3], 5*time.Second)
                // request node3 that serves slot 8192, that's ok
                require.Equal(t, "8192", rdb[3].Get(ctx, 
util.SlotTable[8192]).Val())
 
diff --git a/tests/gocase/integration/replication/replication_test.go 
b/tests/gocase/integration/replication/replication_test.go
index 7adc9862..bb88f022 100644
--- a/tests/gocase/integration/replication/replication_test.go
+++ b/tests/gocase/integration/replication/replication_test.go
@@ -66,7 +66,7 @@ func TestClusterReplication(t *testing.T) {
                require.Equal(t, "slave", util.FindInfoEntry(replicaClient, 
"role"))
                masterClient.Set(ctx, "k0", "v0", 0)
                masterClient.LPush(ctx, "k1", "e0", "e1", "e2")
-               util.WaitForOffsetSync(t, masterClient, replicaClient)
+               util.WaitForOffsetSync(t, masterClient, replicaClient, 
5*time.Second)
 
                require.Equal(t, "v0", replicaClient.Get(ctx, "k0").Val())
                require.Equal(t, []string{"e2", "e1", "e0"}, 
replicaClient.LRange(ctx, "k1", 0, -1).Val())
@@ -84,7 +84,7 @@ func TestClusterReplication(t *testing.T) {
                // allow to run the read-only command in the replica
                require.NoError(t, replicaClient.ReadOnly(ctx).Err())
 
-               util.WaitForOffsetSync(t, masterClient, replicaClient)
+               util.WaitForOffsetSync(t, masterClient, replicaClient, 
5*time.Second)
                require.Equal(t, "v1", replicaClient.Get(ctx, "k0").Val())
                require.Equal(t, map[string]string{"f0": "v0", "f1": "v1"}, 
replicaClient.HGetAll(ctx, "k2").Val())
        })
@@ -387,7 +387,7 @@ func TestReplicationContinueRunning(t *testing.T) {
                        "0": 0, "1": 1, "2": 2, "3": 3, "4": 4, "5": 5, "6": 6, 
"7": 7, "8": 8, "9": 9,
                        "a": "a", "b": "b", "c": "c", "d": "d", "e": "e", "f": 
"f", "g": "g", "h": "h", "i": "i", "j": "j", "k": "k"})
                require.EqualValues(t, 21, masterClient.HLen(ctx, 
"myhash").Val())
-               util.WaitForOffsetSync(t, masterClient, slaveClient)
+               util.WaitForOffsetSync(t, masterClient, slaveClient, 
5*time.Second)
                require.Equal(t, "1", slaveClient.HGet(ctx, "myhash", 
"1").Val())
                require.Equal(t, "a", slaveClient.HGet(ctx, "myhash", 
"a").Val())
        })
@@ -508,3 +508,47 @@ func TestShouldNotReplicate(t *testing.T) {
                require.Equal(t, "master", util.FindInfoEntry(masterClient, 
"role"))
        })
 }
+
+func TestFullSyncReplication(t *testing.T) {
+       master := util.StartServer(t, map[string]string{
+               "rocksdb.write_buffer_size":       "4",
+               "rocksdb.target_file_size_base":   "16",
+               "rocksdb.max_write_buffer_number": "1",
+               "rocksdb.wal_ttl_seconds":         "0",
+               "rocksdb.wal_size_limit_mb":       "0",
+       })
+       defer master.Close()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+
+       slave := util.StartServer(t, map[string]string{})
+       defer slave.Close()
+       slaveClient := slave.NewClient()
+       defer func() { require.NoError(t, slaveClient.Close()) }()
+
+       ctx := context.Background()
+
+       t.Run("Full sync replication should work correctly", func(t *testing.T) 
{
+               value := strings.Repeat("a", 128*1024)
+               for i := 0; i < 1024; i++ {
+                       require.NoError(t, masterClient.Set(ctx, 
fmt.Sprintf("key%d", i), value, 0).Err())
+               }
+
+               util.SlaveOf(t, slaveClient, master)
+               // Wait more time for full sync to avoid flake test in CI 
environment
+               util.WaitForOffsetSync(t, masterClient, slaveClient, 
60*time.Second)
+
+               // Make sure the full sync happened in replication
+               syncFullCount, err := 
strconv.Atoi(util.FindInfoEntry(masterClient, "sync_full"))
+               require.NoError(t, err)
+               require.Greater(t, syncFullCount, 0)
+
+               got, err := slaveClient.Get(ctx, "key1").Result()
+               require.NoError(t, err)
+               require.Equal(t, value, got)
+
+               require.NoError(t, masterClient.Set(ctx, "foo", "bar", 0).Err())
+               util.WaitForOffsetSync(t, masterClient, slaveClient, 
5*time.Second)
+               require.Equal(t, "bar", slaveClient.Get(ctx, "foo").Val())
+       })
+}
diff --git a/tests/gocase/integration/rsid/rsid_test.go 
b/tests/gocase/integration/rsid/rsid_test.go
index 9f5a18ea..dd5ee137 100644
--- a/tests/gocase/integration/rsid/rsid_test.go
+++ b/tests/gocase/integration/rsid/rsid_test.go
@@ -74,7 +74,7 @@ func TestRSIDMasterAndReplicaYes(t *testing.T) {
 
        t.Run("chained replication can propagate updates", func(t *testing.T) {
                require.NoError(t, rdbB.Set(ctx, "master", "B", 0).Err())
-               util.WaitForOffsetSync(t, rdbB, rdbA)
+               util.WaitForOffsetSync(t, rdbB, rdbA, 5*time.Second)
                require.Equal(t, "B", rdbA.Get(ctx, "master").Val())
        })
 
diff --git a/tests/gocase/tls/tls_test.go b/tests/gocase/tls/tls_test.go
index 609f6479..8adba878 100644
--- a/tests/gocase/tls/tls_test.go
+++ b/tests/gocase/tls/tls_test.go
@@ -172,7 +172,7 @@ func TestTLSReplica(t *testing.T) {
                require.Equal(t, rc.Get(ctx, "b").Val(), "")
                require.NoError(t, sc.Set(ctx, "a", "1", 0).Err())
                require.NoError(t, sc.Set(ctx, "b", "2", 0).Err())
-               util.WaitForOffsetSync(t, sc, rc)
+               util.WaitForOffsetSync(t, sc, rc, 5*time.Second)
                require.Equal(t, rc.Get(ctx, "a").Val(), "1")
                require.Equal(t, rc.Get(ctx, "b").Val(), "2")
        })
@@ -189,7 +189,7 @@ func TestTLSReplica(t *testing.T) {
        defer func() { require.NoError(t, rc2.Close()) }()
 
        t.Run("TLS: Replication (full)", func(t *testing.T) {
-               util.WaitForOffsetSync(t, sc, rc2)
+               util.WaitForOffsetSync(t, sc, rc2, 5*time.Second)
                require.Equal(t, rc2.Get(ctx, "a").Val(), "1")
                require.Equal(t, rc2.Get(ctx, "b").Val(), "2")
                require.Equal(t, rc2.Get(ctx, "c").Val(), "3")
diff --git a/tests/gocase/unit/namespace/namespace_test.go 
b/tests/gocase/unit/namespace/namespace_test.go
index 755766bb..9409b61a 100644
--- a/tests/gocase/unit/namespace/namespace_test.go
+++ b/tests/gocase/unit/namespace/namespace_test.go
@@ -23,6 +23,7 @@ import (
        "context"
        "sync"
        "testing"
+       "time"
 
        "github.com/apache/kvrocks/tests/gocase/util"
        "github.com/redis/go-redis/v9"
@@ -181,7 +182,7 @@ func TestNamespaceReplicate(t *testing.T) {
                        require.NoError(t, r.Err())
                        require.Equal(t, "OK", r.Val())
                }
-               util.WaitForOffsetSync(t, slaveRdb, masterRdb)
+               util.WaitForOffsetSync(t, slaveRdb, masterRdb, 5*time.Second)
 
                // Can read namespaces on master
                for ns, token := range nsTokens {
@@ -211,7 +212,7 @@ func TestNamespaceReplicate(t *testing.T) {
                        require.NoError(t, r.Err())
                        require.Equal(t, "OK", r.Val())
                }
-               util.WaitForOffsetSync(t, slaveRdb, masterRdb)
+               util.WaitForOffsetSync(t, slaveRdb, masterRdb, 5*time.Second)
 
                for ns, token := range nsTokens {
                        r := slaveRdb.Do(ctx, "NAMESPACE", "GET", ns)
@@ -224,7 +225,7 @@ func TestNamespaceReplicate(t *testing.T) {
                        require.NoError(t, r.Err())
                        require.Equal(t, "OK", r.Val())
                }
-               util.WaitForOffsetSync(t, slaveRdb, masterRdb)
+               util.WaitForOffsetSync(t, slaveRdb, masterRdb, 5*time.Second)
 
                for ns := range nsTokens {
                        r := slaveRdb.Do(ctx, "NAMESPACE", "GET", ns)
diff --git a/tests/gocase/unit/scripting/scripting_test.go 
b/tests/gocase/unit/scripting/scripting_test.go
index 9e4f2752..1e97cad5 100644
--- a/tests/gocase/unit/scripting/scripting_test.go
+++ b/tests/gocase/unit/scripting/scripting_test.go
@@ -24,6 +24,7 @@ import (
        "fmt"
        "math/big"
        "testing"
+       "time"
 
        "github.com/apache/kvrocks/tests/gocase/util"
        "github.com/redis/go-redis/v9"
@@ -510,12 +511,12 @@ func TestScriptingMasterSlave(t *testing.T) {
        t.Run("SCRIPTING: script load on master, read on slave", func(t 
*testing.T) {
                sha := masterClient.ScriptLoad(ctx, `return 'script 
loaded'`).Val()
                require.Equal(t, "4167ea82ed9c381c7659f7cf93f394219147e8c4", 
sha)
-               util.WaitForOffsetSync(t, masterClient, slaveClient)
+               util.WaitForOffsetSync(t, masterClient, slaveClient, 
5*time.Second)
                require.Equal(t, []bool{true}, masterClient.ScriptExists(ctx, 
sha).Val())
                require.Equal(t, []bool{true}, slaveClient.ScriptExists(ctx, 
sha).Val())
 
                require.NoError(t, masterClient.ScriptFlush(ctx).Err())
-               util.WaitForOffsetSync(t, masterClient, slaveClient)
+               util.WaitForOffsetSync(t, masterClient, slaveClient, 
5*time.Second)
                require.Equal(t, []bool{false}, masterClient.ScriptExists(ctx, 
sha).Val())
                require.Equal(t, []bool{false}, slaveClient.ScriptExists(ctx, 
sha).Val())
        })
diff --git a/tests/gocase/util/client.go b/tests/gocase/util/client.go
index 1e062854..6d4fb5bb 100644
--- a/tests/gocase/util/client.go
+++ b/tests/gocase/util/client.go
@@ -48,12 +48,12 @@ func WaitForSync(t testing.TB, slave *redis.Client) {
        }, 5*time.Second, 100*time.Millisecond)
 }
 
-func WaitForOffsetSync(t testing.TB, master, slave *redis.Client) {
+func WaitForOffsetSync(t testing.TB, master, slave *redis.Client, waitFor 
time.Duration) {
        require.Eventually(t, func() bool {
                o1 := FindInfoEntry(master, "master_repl_offset")
                o2 := FindInfoEntry(slave, "master_repl_offset")
                return o1 == o2
-       }, 5*time.Second, 100*time.Millisecond)
+       }, waitFor, 100*time.Millisecond)
 }
 
 func SlaveOf(t testing.TB, slave *redis.Client, master *KvrocksServer) {

Reply via email to