This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 2836bf02 chore(tests): add basic test case for the full+partial sync
(#2522)
2836bf02 is described below
commit 2836bf02877fa9853af6047dc35b206dea7330f6
Author: hulk <[email protected]>
AuthorDate: Fri Sep 6 12:08:44 2024 +0800
chore(tests): add basic test case for the full+partial sync (#2522)
---
tests/gocase/integration/cluster/cluster_test.go | 4 +-
.../integration/replication/replication_test.go | 50 ++++++++++++++++++++--
tests/gocase/integration/rsid/rsid_test.go | 2 +-
tests/gocase/tls/tls_test.go | 4 +-
tests/gocase/unit/namespace/namespace_test.go | 7 +--
tests/gocase/unit/scripting/scripting_test.go | 5 ++-
tests/gocase/util/client.go | 4 +-
7 files changed, 61 insertions(+), 15 deletions(-)
diff --git a/tests/gocase/integration/cluster/cluster_test.go
b/tests/gocase/integration/cluster/cluster_test.go
index 541c2a99..3c75e9ff 100644
--- a/tests/gocase/integration/cluster/cluster_test.go
+++ b/tests/gocase/integration/cluster/cluster_test.go
@@ -399,7 +399,7 @@ func TestClusterMultiple(t *testing.T) {
// request replicas a write command, it's wrong
require.ErrorContains(t, rdb[3].Set(ctx, util.SlotTable[16383],
16383, 0).Err(), "MOVED")
// request a read-only command to node3 that serve slot 16383,
that's ok
- util.WaitForOffsetSync(t, rdb[2], rdb[3])
+ util.WaitForOffsetSync(t, rdb[2], rdb[3], 5*time.Second)
//the default option is READWRITE, which will redirect both
read and write to master
require.ErrorContains(t, rdb[3].Get(ctx,
util.SlotTable[16383]).Err(), "MOVED")
@@ -445,7 +445,7 @@ func TestClusterMultiple(t *testing.T) {
require.NoError(t, rdb[3].Do(ctx, "READONLY").Err())
require.NoError(t, rdb[2].Set(ctx, util.SlotTable[8192], 8192,
0).Err())
- util.WaitForOffsetSync(t, rdb[2], rdb[3])
+ util.WaitForOffsetSync(t, rdb[2], rdb[3], 5*time.Second)
// request node3 that serves slot 8192, that's ok
require.Equal(t, "8192", rdb[3].Get(ctx,
util.SlotTable[8192]).Val())
diff --git a/tests/gocase/integration/replication/replication_test.go
b/tests/gocase/integration/replication/replication_test.go
index 7adc9862..bb88f022 100644
--- a/tests/gocase/integration/replication/replication_test.go
+++ b/tests/gocase/integration/replication/replication_test.go
@@ -66,7 +66,7 @@ func TestClusterReplication(t *testing.T) {
require.Equal(t, "slave", util.FindInfoEntry(replicaClient,
"role"))
masterClient.Set(ctx, "k0", "v0", 0)
masterClient.LPush(ctx, "k1", "e0", "e1", "e2")
- util.WaitForOffsetSync(t, masterClient, replicaClient)
+ util.WaitForOffsetSync(t, masterClient, replicaClient,
5*time.Second)
require.Equal(t, "v0", replicaClient.Get(ctx, "k0").Val())
require.Equal(t, []string{"e2", "e1", "e0"},
replicaClient.LRange(ctx, "k1", 0, -1).Val())
@@ -84,7 +84,7 @@ func TestClusterReplication(t *testing.T) {
// allow to run the read-only command in the replica
require.NoError(t, replicaClient.ReadOnly(ctx).Err())
- util.WaitForOffsetSync(t, masterClient, replicaClient)
+ util.WaitForOffsetSync(t, masterClient, replicaClient,
5*time.Second)
require.Equal(t, "v1", replicaClient.Get(ctx, "k0").Val())
require.Equal(t, map[string]string{"f0": "v0", "f1": "v1"},
replicaClient.HGetAll(ctx, "k2").Val())
})
@@ -387,7 +387,7 @@ func TestReplicationContinueRunning(t *testing.T) {
"0": 0, "1": 1, "2": 2, "3": 3, "4": 4, "5": 5, "6": 6,
"7": 7, "8": 8, "9": 9,
"a": "a", "b": "b", "c": "c", "d": "d", "e": "e", "f":
"f", "g": "g", "h": "h", "i": "i", "j": "j", "k": "k"})
require.EqualValues(t, 21, masterClient.HLen(ctx,
"myhash").Val())
- util.WaitForOffsetSync(t, masterClient, slaveClient)
+ util.WaitForOffsetSync(t, masterClient, slaveClient,
5*time.Second)
require.Equal(t, "1", slaveClient.HGet(ctx, "myhash",
"1").Val())
require.Equal(t, "a", slaveClient.HGet(ctx, "myhash",
"a").Val())
})
@@ -508,3 +508,47 @@ func TestShouldNotReplicate(t *testing.T) {
require.Equal(t, "master", util.FindInfoEntry(masterClient,
"role"))
})
}
+
+func TestFullSyncReplication(t *testing.T) {
+ master := util.StartServer(t, map[string]string{
+ "rocksdb.write_buffer_size": "4",
+ "rocksdb.target_file_size_base": "16",
+ "rocksdb.max_write_buffer_number": "1",
+ "rocksdb.wal_ttl_seconds": "0",
+ "rocksdb.wal_size_limit_mb": "0",
+ })
+ defer master.Close()
+ masterClient := master.NewClient()
+ defer func() { require.NoError(t, masterClient.Close()) }()
+
+ slave := util.StartServer(t, map[string]string{})
+ defer slave.Close()
+ slaveClient := slave.NewClient()
+ defer func() { require.NoError(t, slaveClient.Close()) }()
+
+ ctx := context.Background()
+
+ t.Run("Full sync replication should work correctly", func(t *testing.T)
{
+ value := strings.Repeat("a", 128*1024)
+ for i := 0; i < 1024; i++ {
+ require.NoError(t, masterClient.Set(ctx,
fmt.Sprintf("key%d", i), value, 0).Err())
+ }
+
+ util.SlaveOf(t, slaveClient, master)
+ // Wait more time for full sync to avoid flake test in CI
environment
+ util.WaitForOffsetSync(t, masterClient, slaveClient,
60*time.Second)
+
+ // Make sure the full sync happened in replication
+ syncFullCount, err :=
strconv.Atoi(util.FindInfoEntry(masterClient, "sync_full"))
+ require.NoError(t, err)
+ require.Greater(t, syncFullCount, 0)
+
+ got, err := slaveClient.Get(ctx, "key1").Result()
+ require.NoError(t, err)
+ require.Equal(t, value, got)
+
+ require.NoError(t, masterClient.Set(ctx, "foo", "bar", 0).Err())
+ util.WaitForOffsetSync(t, masterClient, slaveClient,
5*time.Second)
+ require.Equal(t, "bar", slaveClient.Get(ctx, "foo").Val())
+ })
+}
diff --git a/tests/gocase/integration/rsid/rsid_test.go
b/tests/gocase/integration/rsid/rsid_test.go
index 9f5a18ea..dd5ee137 100644
--- a/tests/gocase/integration/rsid/rsid_test.go
+++ b/tests/gocase/integration/rsid/rsid_test.go
@@ -74,7 +74,7 @@ func TestRSIDMasterAndReplicaYes(t *testing.T) {
t.Run("chained replication can propagate updates", func(t *testing.T) {
require.NoError(t, rdbB.Set(ctx, "master", "B", 0).Err())
- util.WaitForOffsetSync(t, rdbB, rdbA)
+ util.WaitForOffsetSync(t, rdbB, rdbA, 5*time.Second)
require.Equal(t, "B", rdbA.Get(ctx, "master").Val())
})
diff --git a/tests/gocase/tls/tls_test.go b/tests/gocase/tls/tls_test.go
index 609f6479..8adba878 100644
--- a/tests/gocase/tls/tls_test.go
+++ b/tests/gocase/tls/tls_test.go
@@ -172,7 +172,7 @@ func TestTLSReplica(t *testing.T) {
require.Equal(t, rc.Get(ctx, "b").Val(), "")
require.NoError(t, sc.Set(ctx, "a", "1", 0).Err())
require.NoError(t, sc.Set(ctx, "b", "2", 0).Err())
- util.WaitForOffsetSync(t, sc, rc)
+ util.WaitForOffsetSync(t, sc, rc, 5*time.Second)
require.Equal(t, rc.Get(ctx, "a").Val(), "1")
require.Equal(t, rc.Get(ctx, "b").Val(), "2")
})
@@ -189,7 +189,7 @@ func TestTLSReplica(t *testing.T) {
defer func() { require.NoError(t, rc2.Close()) }()
t.Run("TLS: Replication (full)", func(t *testing.T) {
- util.WaitForOffsetSync(t, sc, rc2)
+ util.WaitForOffsetSync(t, sc, rc2, 5*time.Second)
require.Equal(t, rc2.Get(ctx, "a").Val(), "1")
require.Equal(t, rc2.Get(ctx, "b").Val(), "2")
require.Equal(t, rc2.Get(ctx, "c").Val(), "3")
diff --git a/tests/gocase/unit/namespace/namespace_test.go
b/tests/gocase/unit/namespace/namespace_test.go
index 755766bb..9409b61a 100644
--- a/tests/gocase/unit/namespace/namespace_test.go
+++ b/tests/gocase/unit/namespace/namespace_test.go
@@ -23,6 +23,7 @@ import (
"context"
"sync"
"testing"
+ "time"
"github.com/apache/kvrocks/tests/gocase/util"
"github.com/redis/go-redis/v9"
@@ -181,7 +182,7 @@ func TestNamespaceReplicate(t *testing.T) {
require.NoError(t, r.Err())
require.Equal(t, "OK", r.Val())
}
- util.WaitForOffsetSync(t, slaveRdb, masterRdb)
+ util.WaitForOffsetSync(t, slaveRdb, masterRdb, 5*time.Second)
// Can read namespaces on master
for ns, token := range nsTokens {
@@ -211,7 +212,7 @@ func TestNamespaceReplicate(t *testing.T) {
require.NoError(t, r.Err())
require.Equal(t, "OK", r.Val())
}
- util.WaitForOffsetSync(t, slaveRdb, masterRdb)
+ util.WaitForOffsetSync(t, slaveRdb, masterRdb, 5*time.Second)
for ns, token := range nsTokens {
r := slaveRdb.Do(ctx, "NAMESPACE", "GET", ns)
@@ -224,7 +225,7 @@ func TestNamespaceReplicate(t *testing.T) {
require.NoError(t, r.Err())
require.Equal(t, "OK", r.Val())
}
- util.WaitForOffsetSync(t, slaveRdb, masterRdb)
+ util.WaitForOffsetSync(t, slaveRdb, masterRdb, 5*time.Second)
for ns := range nsTokens {
r := slaveRdb.Do(ctx, "NAMESPACE", "GET", ns)
diff --git a/tests/gocase/unit/scripting/scripting_test.go
b/tests/gocase/unit/scripting/scripting_test.go
index 9e4f2752..1e97cad5 100644
--- a/tests/gocase/unit/scripting/scripting_test.go
+++ b/tests/gocase/unit/scripting/scripting_test.go
@@ -24,6 +24,7 @@ import (
"fmt"
"math/big"
"testing"
+ "time"
"github.com/apache/kvrocks/tests/gocase/util"
"github.com/redis/go-redis/v9"
@@ -510,12 +511,12 @@ func TestScriptingMasterSlave(t *testing.T) {
t.Run("SCRIPTING: script load on master, read on slave", func(t
*testing.T) {
sha := masterClient.ScriptLoad(ctx, `return 'script
loaded'`).Val()
require.Equal(t, "4167ea82ed9c381c7659f7cf93f394219147e8c4",
sha)
- util.WaitForOffsetSync(t, masterClient, slaveClient)
+ util.WaitForOffsetSync(t, masterClient, slaveClient,
5*time.Second)
require.Equal(t, []bool{true}, masterClient.ScriptExists(ctx,
sha).Val())
require.Equal(t, []bool{true}, slaveClient.ScriptExists(ctx,
sha).Val())
require.NoError(t, masterClient.ScriptFlush(ctx).Err())
- util.WaitForOffsetSync(t, masterClient, slaveClient)
+ util.WaitForOffsetSync(t, masterClient, slaveClient,
5*time.Second)
require.Equal(t, []bool{false}, masterClient.ScriptExists(ctx,
sha).Val())
require.Equal(t, []bool{false}, slaveClient.ScriptExists(ctx,
sha).Val())
})
diff --git a/tests/gocase/util/client.go b/tests/gocase/util/client.go
index 1e062854..6d4fb5bb 100644
--- a/tests/gocase/util/client.go
+++ b/tests/gocase/util/client.go
@@ -48,12 +48,12 @@ func WaitForSync(t testing.TB, slave *redis.Client) {
}, 5*time.Second, 100*time.Millisecond)
}
-func WaitForOffsetSync(t testing.TB, master, slave *redis.Client) {
+func WaitForOffsetSync(t testing.TB, master, slave *redis.Client, waitFor
time.Duration) {
require.Eventually(t, func() bool {
o1 := FindInfoEntry(master, "master_repl_offset")
o2 := FindInfoEntry(slave, "master_repl_offset")
return o1 == o2
- }, 5*time.Second, 100*time.Millisecond)
+ }, waitFor, 100*time.Millisecond)
}
func SlaveOf(t testing.TB, slave *redis.Client, master *KvrocksServer) {