This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 4433b17  Fix flaky test in controller module (#170)
4433b17 is described below

commit 4433b17aa6d8b32ae2e4718e0c62adb51c968e46
Author: hulk <[email protected]>
AuthorDate: Mon Apr 29 20:48:01 2024 +0800

    Fix flaky test in controller module (#170)
---
 controller/cluster.go         |  8 ++++----
 controller/cluster_test.go    |  6 +++---
 controller/controller.go      | 17 ++++++++++++-----
 controller/controller_test.go |  2 +-
 server/server.go              |  1 +
 store/engine/engine.go        | 16 ++++++++++++++--
 6 files changed, 35 insertions(+), 15 deletions(-)

diff --git a/controller/cluster.go b/controller/cluster.go
index 27fcc6b..87a28c6 100644
--- a/controller/cluster.go
+++ b/controller/cluster.go
@@ -36,13 +36,13 @@ var (
        ErrRestoringBackUp       = errors.New("ERR LOADING kvrocks is restoring 
the db from backup")
 )
 
-type ClusterOptions struct {
+type ClusterCheckOptions struct {
        pingInterval    time.Duration
        maxFailureCount int64
 }
 
 type ClusterChecker struct {
-       options      ClusterOptions
+       options      ClusterCheckOptions
        clusterStore store.Store
        clusterMu    sync.Mutex
        cluster      *store.Cluster
@@ -60,14 +60,14 @@ type ClusterChecker struct {
        wg sync.WaitGroup
 }
 
-func NewClusterProbe(s store.Store, ns, cluster string) *ClusterChecker {
+func NewClusterChecker(s store.Store, ns, cluster string) *ClusterChecker {
        ctx, cancel := context.WithCancel(context.Background())
        c := &ClusterChecker{
                namespace:   ns,
                clusterName: cluster,
 
                clusterStore: s,
-               options: ClusterOptions{
+               options: ClusterCheckOptions{
                        pingInterval:    time.Second * 3,
                        maxFailureCount: 5,
                },
diff --git a/controller/cluster_test.go b/controller/cluster_test.go
index 56b2d91..65a8ac6 100644
--- a/controller/cluster_test.go
+++ b/controller/cluster_test.go
@@ -120,7 +120,7 @@ func TestCluster_FailureCount(t *testing.T) {
                clusterStore: s,
                namespace:    ns,
                clusterName:  clusterName,
-               options: ClusterOptions{
+               options: ClusterCheckOptions{
                        pingInterval:    time.Second,
                        maxFailureCount: 3,
                },
@@ -173,7 +173,7 @@ func TestCluster_LoadAndProbe(t *testing.T) {
        s := NewMockClusterStore()
        require.NoError(t, s.CreateCluster(ctx, ns, cluster))
 
-       clusterProbe := NewClusterProbe(s, ns, clusterName)
+       clusterProbe := NewClusterChecker(s, ns, clusterName)
        clusterProbe.WithPingInterval(100 * time.Millisecond)
        clusterProbe.Start()
        defer clusterProbe.Close()
@@ -223,7 +223,7 @@ func TestCluster_MigrateSlot(t *testing.T) {
        s := NewMockClusterStore()
        require.NoError(t, s.CreateCluster(ctx, ns, cluster))
 
-       clusterProbe := NewClusterProbe(s, ns, clusterName)
+       clusterProbe := NewClusterChecker(s, ns, clusterName)
        clusterProbe.WithPingInterval(100 * time.Millisecond)
        clusterProbe.Start()
        defer clusterProbe.Close()
diff --git a/controller/controller.go b/controller/controller.go
index 4e6ec99..933553d 100644
--- a/controller/controller.go
+++ b/controller/controller.go
@@ -72,10 +72,12 @@ func (c *Controller) Start(ctx context.Context) error {
 
        c.wg.Add(1)
        go c.syncLoop(ctx)
+       c.wg.Add(1)
+       go c.leaderEventLoop()
        return nil
 }
 
-func (c *Controller) WaitReady() {
+func (c *Controller) WaitForReady() {
        <-c.readyCh
 }
 
@@ -122,7 +124,6 @@ func (c *Controller) syncLoop(ctx context.Context) {
        defer c.wg.Done()
 
        prevTermLeader := ""
-       go c.leaderEventLoop()
        if c.clusterStore.IsLeader() {
                c.becomeLeader(ctx, prevTermLeader)
        }
@@ -148,6 +149,7 @@ func (c *Controller) syncLoop(ctx context.Context) {
 }
 
 func (c *Controller) leaderEventLoop() {
+       defer c.wg.Done()
        for {
                select {
                case event := <-c.clusterStore.Notify():
@@ -176,7 +178,11 @@ func (c *Controller) buildClusterKey(namespace, 
clusterName string) string {
 
 func (c *Controller) addCluster(namespace, clusterName string) {
        key := c.buildClusterKey(namespace, clusterName)
-       cluster := NewClusterProbe(c.clusterStore, namespace, clusterName).
+       if cluster, err := c.getCluster(namespace, clusterName); err == nil && 
cluster != nil {
+               return
+       }
+
+       cluster := NewClusterChecker(c.clusterStore, namespace, clusterName).
                
WithPingInterval(time.Duration(c.config.FailOver.PingIntervalSeconds) * 
time.Second).
                WithMaxFailureCount(c.config.FailOver.MaxPingCount)
        cluster.Start()
@@ -188,9 +194,10 @@ func (c *Controller) addCluster(namespace, clusterName 
string) {
 
 func (c *Controller) getCluster(namespace, clusterName string) 
(*ClusterChecker, error) {
        key := c.buildClusterKey(namespace, clusterName)
+
        c.mu.Lock()
+       defer c.mu.Unlock()
        cluster, ok := c.clusters[key]
-       c.mu.Unlock()
        if !ok {
                return nil, consts.ErrNotFound
        }
@@ -202,8 +209,8 @@ func (c *Controller) removeCluster(namespace, clusterName 
string) {
        c.mu.Lock()
        if cluster, ok := c.clusters[key]; ok {
                cluster.Close()
+               delete(c.clusters, key)
        }
-       delete(c.clusters, key)
        c.mu.Unlock()
 }
 
diff --git a/controller/controller_test.go b/controller/controller_test.go
index a0cc862..0f7828f 100644
--- a/controller/controller_test.go
+++ b/controller/controller_test.go
@@ -56,7 +56,7 @@ func TestController_Basics(t *testing.T) {
                c.Close()
        }()
 
-       c.WaitReady()
+       c.WaitForReady()
 
        t.Run("get cluster", func(t *testing.T) {
                cluster, err := c.getCluster(ns, "test-cluster-0")
diff --git a/server/server.go b/server/server.go
index d4b745c..65f5ae9 100644
--- a/server/server.go
+++ b/server/server.go
@@ -121,6 +121,7 @@ func (srv *Server) Start(ctx context.Context) error {
        if err := srv.controller.Start(ctx); err != nil {
                return err
        }
+       srv.controller.WaitForReady()
        srv.startAPIServer()
        return nil
 }
diff --git a/store/engine/engine.go b/store/engine/engine.go
index 5052bd0..7a264af 100644
--- a/store/engine/engine.go
+++ b/store/engine/engine.go
@@ -92,11 +92,23 @@ func (m *Mock) Delete(ctx context.Context, key string) 
error {
 func (m *Mock) List(ctx context.Context, prefix string) ([]Entry, error) {
        m.mu.Lock()
        defer m.mu.Unlock()
+
+       exists := make(map[string]bool, 0)
        var entries []Entry
        for k, v := range m.values {
-               if k == prefix || (len(k) > len(prefix) && k[len(prefix)] == 
'/') {
+               if strings.HasPrefix(k, prefix) {
+                       k = strings.Trim(strings.TrimPrefix(k, prefix), "/")
+                       fields := strings.SplitN(k, "/", 2)
+                       if len(fields) == 2 {
+                               // only list the first level
+                               k = fields[0]
+                       }
+                       if _, ok := exists[k]; ok {
+                               continue
+                       }
+                       exists[k] = true
                        entries = append(entries, Entry{
-                               Key:   strings.TrimPrefix(k, prefix+"/"),
+                               Key:   k,
                                Value: []byte(v),
                        })
                }

Reply via email to