This is an automated email from the ASF dual-hosted git repository.
maplefu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 7cda4208 test(integration): add integration test for master lost
during syncing sst files. (#2691)
7cda4208 is described below
commit 7cda4208750f6beaeeccbf243cfe4f803796dd0f
Author: Edward Xu <[email protected]>
AuthorDate: Thu Dec 12 14:39:34 2024 +0800
test(integration): add integration test for master lost during syncing sst
files. (#2691)
---
tests/gocase/go.mod | 1 +
tests/gocase/go.sum | 6 +-
.../integration/replication/replication_test.go | 57 +++++++++++++++
tests/gocase/util/client.go | 80 ++++++++++++++++++++++
4 files changed, 140 insertions(+), 4 deletions(-)
diff --git a/tests/gocase/go.mod b/tests/gocase/go.mod
index 21fb651f..aa664226 100644
--- a/tests/gocase/go.mod
+++ b/tests/gocase/go.mod
@@ -23,6 +23,7 @@ require (
github.com/tklauser/go-sysconf v0.3.14 // indirect
github.com/tklauser/numcpus v0.9.0 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
+ golang.org/x/sync v0.10.0
golang.org/x/sys v0.27.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
diff --git a/tests/gocase/go.sum b/tests/gocase/go.sum
index 19ce5b37..b153085f 100644
--- a/tests/gocase/go.sum
+++ b/tests/gocase/go.sum
@@ -29,8 +29,6 @@ github.com/shoenig/go-m1cpu v0.1.6
h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt
github.com/shoenig/go-m1cpu v0.1.6/go.mod
h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU=
github.com/shoenig/test v0.6.4/go.mod
h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
-github.com/stretchr/testify v1.9.0
h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
-github.com/stretchr/testify v1.9.0/go.mod
h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.10.0
h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod
h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tklauser/go-sysconf v0.3.14
h1:g5vzr9iPFFz24v2KZXs/pvpvh8/V9Fw6vQK5ZZb78yU=
@@ -39,10 +37,10 @@ github.com/tklauser/numcpus v0.9.0
h1:lmyCHtANi8aRUgkckBgoDk1nHCux3n2cgkJLXdQGPD
github.com/tklauser/numcpus v0.9.0/go.mod
h1:SN6Nq1O3VychhC1npsWostA+oW+VOQTxZrS604NSRyI=
github.com/yusufpapurcu/wmi v1.2.4
h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod
h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
-golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
h1:7dEasQXItcW1xKJ2+gg5VOiBnqWrJc+rq0DPKyvvdbY=
-golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c/go.mod
h1:NQtJDoLvd6faHhE7m4T/1IY708gDefGGjR/iUW8yQQ8=
golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f
h1:XdNn9LlyWAhLVp6P/i8QYBW+hlyhrhei9uErw2B5GJo=
golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f/go.mod
h1:D5SMRVC3C2/4+F/DB1wZsLRnSNimn2Sp/NPsCrsv8ak=
+golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
+golang.org/x/sync v0.10.0/go.mod
h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
diff --git a/tests/gocase/integration/replication/replication_test.go
b/tests/gocase/integration/replication/replication_test.go
index bb88f022..ac01e1b9 100644
--- a/tests/gocase/integration/replication/replication_test.go
+++ b/tests/gocase/integration/replication/replication_test.go
@@ -552,3 +552,60 @@ func TestFullSyncReplication(t *testing.T) {
require.Equal(t, "bar", slaveClient.Get(ctx, "foo").Val())
})
}
+
+func TestSlaveLostMaster(t *testing.T) {
+ // integration test for #2662 and #2671
+ ctx := context.Background()
+
+ masterSrv := util.StartServer(t, map[string]string{
+ "cluster-enabled": "yes",
+ "max-replication-mb": "1",
+ "rocksdb.compression": "no",
+ "rocksdb.write_buffer_size": "1",
+ "rocksdb.target_file_size_base": "1",
+ })
+ defer func() { masterSrv.Close() }()
+ masterClient := masterSrv.NewClient()
+ defer func() { require.NoError(t, masterClient.Close()) }()
+ masterNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+ require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID",
masterNodeID).Err())
+
+ replicaSrv := util.StartServer(t, map[string]string{
+ "cluster-enabled": "yes",
+ "replication-connect-timeout-ms": "5000",
+ "replication-recv-timeout-ms": "5100",
+ })
+ defer func() { replicaSrv.Close() }()
+ replicaClient := replicaSrv.NewClient()
+ // allow to run the read-only command in the replica
+ require.NoError(t, replicaClient.ReadOnly(ctx).Err())
+ defer func() { require.NoError(t, replicaClient.Close()) }()
+ replicaNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+ require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODEID",
replicaNodeID).Err())
+
+ proxyCtx, cancelProxy := context.WithCancel(ctx)
+ newMasterPort := util.SimpleTCPProxy(proxyCtx, t,
fmt.Sprintf("127.0.0.1:%d", masterSrv.Port()), true)
+
+ masterNodesInfo := fmt.Sprintf("%s 127.0.0.1 %d master - 0-16383\n%s
127.0.0.1 %d slave %s",
+ masterNodeID, masterSrv.Port(), replicaNodeID,
replicaSrv.Port(), masterNodeID)
+ clusterNodesInfo := fmt.Sprintf("%s 127.0.0.1 %d master - 0-16383\n%s
127.0.0.1 %d slave %s",
+ masterNodeID, newMasterPort, replicaNodeID, replicaSrv.Port(),
masterNodeID)
+ unexistNodesInfo := fmt.Sprintf("%s 127.0.0.2 %d master - 0-16383\n%s
127.0.0.1 %d slave %s",
+ masterNodeID, newMasterPort, replicaNodeID, replicaSrv.Port(),
masterNodeID)
+
+ require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES",
masterNodesInfo, "1").Err())
+ value := strings.Repeat("a", 128*1024)
+
+ for i := 0; i < 1024; i++ {
+ require.NoError(t, masterClient.Set(ctx, fmt.Sprintf("key%d",
i), value, 0).Err())
+ }
+
+ require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODES",
clusterNodesInfo, "1").Err())
+
+ time.Sleep(2 * time.Second)
+ cancelProxy()
+ start := time.Now()
+ require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODES",
unexistNodesInfo, "2").Err())
+ duration := time.Since(start)
+ require.Less(t, duration, time.Second*6)
+}
diff --git a/tests/gocase/util/client.go b/tests/gocase/util/client.go
index 6d4fb5bb..180163b5 100644
--- a/tests/gocase/util/client.go
+++ b/tests/gocase/util/client.go
@@ -21,7 +21,10 @@ package util
import (
"context"
+ "errors"
"fmt"
+ "io"
+ "net"
"regexp"
"strings"
"testing"
@@ -29,6 +32,7 @@ import (
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/require"
+ "golang.org/x/sync/errgroup"
)
func FindInfoEntry(rdb *redis.Client, key string, section ...string) string {
@@ -71,3 +75,79 @@ func Populate(t testing.TB, rdb *redis.Client, prefix
string, n, size int) {
_, err := p.Exec(ctx)
require.NoError(t, err)
}
+
+func SimpleTCPProxy(ctx context.Context, t testing.TB, to string, slowdown
bool) uint64 {
+ addr, err := findFreePort()
+ if err != nil {
+ t.Fatalf("can't find a free port, %v", err)
+ }
+ from := addr.String()
+
+ listener, err := net.Listen("tcp", from)
+ if err != nil {
+ t.Fatalf("listen to %s failed, err: %v", from, err)
+ }
+
+ copyBytes := func(src, dest io.ReadWriter) func() error {
+ buffer := make([]byte, 4096)
+ return func() error {
+ COPY_LOOP:
+ for {
+ select {
+ case <-ctx.Done():
+ t.Log("forwarding tcp stream stopped")
+ break COPY_LOOP
+ default:
+ if slowdown {
+ time.Sleep(time.Millisecond *
100)
+ }
+ n, err := src.Read(buffer)
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ break COPY_LOOP
+ }
+ return err
+ }
+ _, err = dest.Write(buffer[:n])
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ break COPY_LOOP
+ }
+ return err
+ }
+ }
+ }
+ return nil
+ }
+ }
+
+ go func() {
+ defer listener.Close()
+ LISTEN_LOOP:
+ for {
+ select {
+ case <-ctx.Done():
+ break LISTEN_LOOP
+
+ default:
+ conn, err := listener.Accept()
+ if err != nil {
+ t.Fatalf("accept conn failed, err: %v",
err)
+ }
+ dest, err := net.Dial("tcp", to)
+ if err != nil {
+ t.Fatalf("accept conn failed, err: %v",
err)
+ }
+ var errGrp errgroup.Group
+ errGrp.Go(copyBytes(conn, dest))
+ errGrp.Go(copyBytes(dest, conn))
+ err = errGrp.Wait()
+ if err != nil {
+ t.Fatalf("forward tcp stream failed,
err: %v", err)
+ }
+
+ }
+ }
+ }()
+ return uint64(addr.Port)
+}