This is an automated email from the ASF dual-hosted git repository.

maplefu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 7cda4208 test(integration): add integration test for master lost 
during syncing sst files. (#2691)
7cda4208 is described below

commit 7cda4208750f6beaeeccbf243cfe4f803796dd0f
Author: Edward Xu <[email protected]>
AuthorDate: Thu Dec 12 14:39:34 2024 +0800

    test(integration): add integration test for master lost during syncing sst 
files. (#2691)
---
 tests/gocase/go.mod                                |  1 +
 tests/gocase/go.sum                                |  6 +-
 .../integration/replication/replication_test.go    | 57 +++++++++++++++
 tests/gocase/util/client.go                        | 80 ++++++++++++++++++++++
 4 files changed, 140 insertions(+), 4 deletions(-)

diff --git a/tests/gocase/go.mod b/tests/gocase/go.mod
index 21fb651f..aa664226 100644
--- a/tests/gocase/go.mod
+++ b/tests/gocase/go.mod
@@ -23,6 +23,7 @@ require (
        github.com/tklauser/go-sysconf v0.3.14 // indirect
        github.com/tklauser/numcpus v0.9.0 // indirect
        github.com/yusufpapurcu/wmi v1.2.4 // indirect
+       golang.org/x/sync v0.10.0
        golang.org/x/sys v0.27.0 // indirect
        gopkg.in/yaml.v3 v3.0.1 // indirect
 )
diff --git a/tests/gocase/go.sum b/tests/gocase/go.sum
index 19ce5b37..b153085f 100644
--- a/tests/gocase/go.sum
+++ b/tests/gocase/go.sum
@@ -29,8 +29,6 @@ github.com/shoenig/go-m1cpu v0.1.6 
h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt
 github.com/shoenig/go-m1cpu v0.1.6/go.mod 
h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
 github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU=
 github.com/shoenig/test v0.6.4/go.mod 
h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
-github.com/stretchr/testify v1.9.0 
h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
-github.com/stretchr/testify v1.9.0/go.mod 
h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
 github.com/stretchr/testify v1.10.0 
h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
 github.com/stretchr/testify v1.10.0/go.mod 
h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
 github.com/tklauser/go-sysconf v0.3.14 
h1:g5vzr9iPFFz24v2KZXs/pvpvh8/V9Fw6vQK5ZZb78yU=
@@ -39,10 +37,10 @@ github.com/tklauser/numcpus v0.9.0 
h1:lmyCHtANi8aRUgkckBgoDk1nHCux3n2cgkJLXdQGPD
 github.com/tklauser/numcpus v0.9.0/go.mod 
h1:SN6Nq1O3VychhC1npsWostA+oW+VOQTxZrS604NSRyI=
 github.com/yusufpapurcu/wmi v1.2.4 
h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
 github.com/yusufpapurcu/wmi v1.2.4/go.mod 
h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
-golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c 
h1:7dEasQXItcW1xKJ2+gg5VOiBnqWrJc+rq0DPKyvvdbY=
-golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c/go.mod 
h1:NQtJDoLvd6faHhE7m4T/1IY708gDefGGjR/iUW8yQQ8=
 golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f 
h1:XdNn9LlyWAhLVp6P/i8QYBW+hlyhrhei9uErw2B5GJo=
 golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f/go.mod 
h1:D5SMRVC3C2/4+F/DB1wZsLRnSNimn2Sp/NPsCrsv8ak=
+golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
+golang.org/x/sync v0.10.0/go.mod 
h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
 golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
diff --git a/tests/gocase/integration/replication/replication_test.go 
b/tests/gocase/integration/replication/replication_test.go
index bb88f022..ac01e1b9 100644
--- a/tests/gocase/integration/replication/replication_test.go
+++ b/tests/gocase/integration/replication/replication_test.go
@@ -552,3 +552,60 @@ func TestFullSyncReplication(t *testing.T) {
                require.Equal(t, "bar", slaveClient.Get(ctx, "foo").Val())
        })
 }
+
+func TestSlaveLostMaster(t *testing.T) {
+       // integration test for #2662 and #2671
+       ctx := context.Background()
+
+       masterSrv := util.StartServer(t, map[string]string{
+               "cluster-enabled":               "yes",
+               "max-replication-mb":            "1",
+               "rocksdb.compression":           "no",
+               "rocksdb.write_buffer_size":     "1",
+               "rocksdb.target_file_size_base": "1",
+       })
+       defer func() { masterSrv.Close() }()
+       masterClient := masterSrv.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+       masterNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", 
masterNodeID).Err())
+
+       replicaSrv := util.StartServer(t, map[string]string{
+               "cluster-enabled":                "yes",
+               "replication-connect-timeout-ms": "5000",
+               "replication-recv-timeout-ms":    "5100",
+       })
+       defer func() { replicaSrv.Close() }()
+       replicaClient := replicaSrv.NewClient()
+       // allow to run the read-only command in the replica
+       require.NoError(t, replicaClient.ReadOnly(ctx).Err())
+       defer func() { require.NoError(t, replicaClient.Close()) }()
+       replicaNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODEID", 
replicaNodeID).Err())
+
+       proxyCtx, cancelProxy := context.WithCancel(ctx)
+       newMasterPort := util.SimpleTCPProxy(proxyCtx, t, 
fmt.Sprintf("127.0.0.1:%d", masterSrv.Port()), true)
+
+       masterNodesInfo := fmt.Sprintf("%s 127.0.0.1 %d master - 0-16383\n%s 
127.0.0.1 %d slave %s",
+               masterNodeID, masterSrv.Port(), replicaNodeID, 
replicaSrv.Port(), masterNodeID)
+       clusterNodesInfo := fmt.Sprintf("%s 127.0.0.1 %d master - 0-16383\n%s 
127.0.0.1 %d slave %s",
+               masterNodeID, newMasterPort, replicaNodeID, replicaSrv.Port(), 
masterNodeID)
+       unexistNodesInfo := fmt.Sprintf("%s 127.0.0.2 %d master - 0-16383\n%s 
127.0.0.1 %d slave %s",
+               masterNodeID, newMasterPort, replicaNodeID, replicaSrv.Port(), 
masterNodeID)
+
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
masterNodesInfo, "1").Err())
+       value := strings.Repeat("a", 128*1024)
+
+       for i := 0; i < 1024; i++ {
+               require.NoError(t, masterClient.Set(ctx, fmt.Sprintf("key%d", 
i), value, 0).Err())
+       }
+
+       require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodesInfo, "1").Err())
+
+       time.Sleep(2 * time.Second)
+       cancelProxy()
+       start := time.Now()
+       require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODES", 
unexistNodesInfo, "2").Err())
+       duration := time.Since(start)
+       require.Less(t, duration, time.Second*6)
+}
diff --git a/tests/gocase/util/client.go b/tests/gocase/util/client.go
index 6d4fb5bb..180163b5 100644
--- a/tests/gocase/util/client.go
+++ b/tests/gocase/util/client.go
@@ -21,7 +21,10 @@ package util
 
 import (
        "context"
+       "errors"
        "fmt"
+       "io"
+       "net"
        "regexp"
        "strings"
        "testing"
@@ -29,6 +32,7 @@ import (
 
        "github.com/redis/go-redis/v9"
        "github.com/stretchr/testify/require"
+       "golang.org/x/sync/errgroup"
 )
 
 func FindInfoEntry(rdb *redis.Client, key string, section ...string) string {
@@ -71,3 +75,79 @@ func Populate(t testing.TB, rdb *redis.Client, prefix 
string, n, size int) {
        _, err := p.Exec(ctx)
        require.NoError(t, err)
 }
+
+func SimpleTCPProxy(ctx context.Context, t testing.TB, to string, slowdown 
bool) uint64 {
+       addr, err := findFreePort()
+       if err != nil {
+               t.Fatalf("can't find a free port, %v", err)
+       }
+       from := addr.String()
+
+       listener, err := net.Listen("tcp", from)
+       if err != nil {
+               t.Fatalf("listen to %s failed, err: %v", from, err)
+       }
+
+       copyBytes := func(src, dest io.ReadWriter) func() error {
+               buffer := make([]byte, 4096)
+               return func() error {
+               COPY_LOOP:
+                       for {
+                               select {
+                               case <-ctx.Done():
+                                       t.Log("forwarding tcp stream stopped")
+                                       break COPY_LOOP
+                               default:
+                                       if slowdown {
+                                               time.Sleep(time.Millisecond * 
100)
+                                       }
+                                       n, err := src.Read(buffer)
+                                       if err != nil {
+                                               if errors.Is(err, io.EOF) {
+                                                       break COPY_LOOP
+                                               }
+                                               return err
+                                       }
+                                       _, err = dest.Write(buffer[:n])
+                                       if err != nil {
+                                               if errors.Is(err, io.EOF) {
+                                                       break COPY_LOOP
+                                               }
+                                               return err
+                                       }
+                               }
+                       }
+                       return nil
+               }
+       }
+
+       go func() {
+               defer listener.Close()
+       LISTEN_LOOP:
+               for {
+                       select {
+                       case <-ctx.Done():
+                               break LISTEN_LOOP
+
+                       default:
+                               conn, err := listener.Accept()
+                               if err != nil {
+                                       t.Fatalf("accept conn failed, err: %v", 
err)
+                               }
+                               dest, err := net.Dial("tcp", to)
+                               if err != nil {
+                                       t.Fatalf("accept conn failed, err: %v", 
err)
+                               }
+                               var errGrp errgroup.Group
+                               errGrp.Go(copyBytes(conn, dest))
+                               errGrp.Go(copyBytes(dest, conn))
+                               err = errGrp.Wait()
+                               if err != nil {
+                                       t.Fatalf("forward tcp stream failed, 
err: %v", err)
+                               }
+
+                       }
+               }
+       }()
+       return uint64(addr.Port)
+}

Reply via email to