This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new 4433b17 Fix flaky test in controller module (#170)
4433b17 is described below
commit 4433b17aa6d8b32ae2e4718e0c62adb51c968e46
Author: hulk <[email protected]>
AuthorDate: Mon Apr 29 20:48:01 2024 +0800
Fix flaky test in controller module (#170)
---
controller/cluster.go | 8 ++++----
controller/cluster_test.go | 6 +++---
controller/controller.go | 17 ++++++++++++-----
controller/controller_test.go | 2 +-
server/server.go | 1 +
store/engine/engine.go | 16 ++++++++++++++--
6 files changed, 35 insertions(+), 15 deletions(-)
diff --git a/controller/cluster.go b/controller/cluster.go
index 27fcc6b..87a28c6 100644
--- a/controller/cluster.go
+++ b/controller/cluster.go
@@ -36,13 +36,13 @@ var (
ErrRestoringBackUp = errors.New("ERR LOADING kvrocks is restoring
the db from backup")
)
-type ClusterOptions struct {
+type ClusterCheckOptions struct {
pingInterval time.Duration
maxFailureCount int64
}
type ClusterChecker struct {
- options ClusterOptions
+ options ClusterCheckOptions
clusterStore store.Store
clusterMu sync.Mutex
cluster *store.Cluster
@@ -60,14 +60,14 @@ type ClusterChecker struct {
wg sync.WaitGroup
}
-func NewClusterProbe(s store.Store, ns, cluster string) *ClusterChecker {
+func NewClusterChecker(s store.Store, ns, cluster string) *ClusterChecker {
ctx, cancel := context.WithCancel(context.Background())
c := &ClusterChecker{
namespace: ns,
clusterName: cluster,
clusterStore: s,
- options: ClusterOptions{
+ options: ClusterCheckOptions{
pingInterval: time.Second * 3,
maxFailureCount: 5,
},
diff --git a/controller/cluster_test.go b/controller/cluster_test.go
index 56b2d91..65a8ac6 100644
--- a/controller/cluster_test.go
+++ b/controller/cluster_test.go
@@ -120,7 +120,7 @@ func TestCluster_FailureCount(t *testing.T) {
clusterStore: s,
namespace: ns,
clusterName: clusterName,
- options: ClusterOptions{
+ options: ClusterCheckOptions{
pingInterval: time.Second,
maxFailureCount: 3,
},
@@ -173,7 +173,7 @@ func TestCluster_LoadAndProbe(t *testing.T) {
s := NewMockClusterStore()
require.NoError(t, s.CreateCluster(ctx, ns, cluster))
- clusterProbe := NewClusterProbe(s, ns, clusterName)
+ clusterProbe := NewClusterChecker(s, ns, clusterName)
clusterProbe.WithPingInterval(100 * time.Millisecond)
clusterProbe.Start()
defer clusterProbe.Close()
@@ -223,7 +223,7 @@ func TestCluster_MigrateSlot(t *testing.T) {
s := NewMockClusterStore()
require.NoError(t, s.CreateCluster(ctx, ns, cluster))
- clusterProbe := NewClusterProbe(s, ns, clusterName)
+ clusterProbe := NewClusterChecker(s, ns, clusterName)
clusterProbe.WithPingInterval(100 * time.Millisecond)
clusterProbe.Start()
defer clusterProbe.Close()
diff --git a/controller/controller.go b/controller/controller.go
index 4e6ec99..933553d 100644
--- a/controller/controller.go
+++ b/controller/controller.go
@@ -72,10 +72,12 @@ func (c *Controller) Start(ctx context.Context) error {
c.wg.Add(1)
go c.syncLoop(ctx)
+ c.wg.Add(1)
+ go c.leaderEventLoop()
return nil
}
-func (c *Controller) WaitReady() {
+func (c *Controller) WaitForReady() {
<-c.readyCh
}
@@ -122,7 +124,6 @@ func (c *Controller) syncLoop(ctx context.Context) {
defer c.wg.Done()
prevTermLeader := ""
- go c.leaderEventLoop()
if c.clusterStore.IsLeader() {
c.becomeLeader(ctx, prevTermLeader)
}
@@ -148,6 +149,7 @@ func (c *Controller) syncLoop(ctx context.Context) {
}
func (c *Controller) leaderEventLoop() {
+ defer c.wg.Done()
for {
select {
case event := <-c.clusterStore.Notify():
@@ -176,7 +178,11 @@ func (c *Controller) buildClusterKey(namespace,
clusterName string) string {
func (c *Controller) addCluster(namespace, clusterName string) {
key := c.buildClusterKey(namespace, clusterName)
- cluster := NewClusterProbe(c.clusterStore, namespace, clusterName).
+ if cluster, err := c.getCluster(namespace, clusterName); err == nil &&
cluster != nil {
+ return
+ }
+
+ cluster := NewClusterChecker(c.clusterStore, namespace, clusterName).
WithPingInterval(time.Duration(c.config.FailOver.PingIntervalSeconds) *
time.Second).
WithMaxFailureCount(c.config.FailOver.MaxPingCount)
cluster.Start()
@@ -188,9 +194,10 @@ func (c *Controller) addCluster(namespace, clusterName
string) {
func (c *Controller) getCluster(namespace, clusterName string)
(*ClusterChecker, error) {
key := c.buildClusterKey(namespace, clusterName)
+
c.mu.Lock()
+ defer c.mu.Unlock()
cluster, ok := c.clusters[key]
- c.mu.Unlock()
if !ok {
return nil, consts.ErrNotFound
}
@@ -202,8 +209,8 @@ func (c *Controller) removeCluster(namespace, clusterName
string) {
c.mu.Lock()
if cluster, ok := c.clusters[key]; ok {
cluster.Close()
+ delete(c.clusters, key)
}
- delete(c.clusters, key)
c.mu.Unlock()
}
diff --git a/controller/controller_test.go b/controller/controller_test.go
index a0cc862..0f7828f 100644
--- a/controller/controller_test.go
+++ b/controller/controller_test.go
@@ -56,7 +56,7 @@ func TestController_Basics(t *testing.T) {
c.Close()
}()
- c.WaitReady()
+ c.WaitForReady()
t.Run("get cluster", func(t *testing.T) {
cluster, err := c.getCluster(ns, "test-cluster-0")
diff --git a/server/server.go b/server/server.go
index d4b745c..65f5ae9 100644
--- a/server/server.go
+++ b/server/server.go
@@ -121,6 +121,7 @@ func (srv *Server) Start(ctx context.Context) error {
if err := srv.controller.Start(ctx); err != nil {
return err
}
+ srv.controller.WaitForReady()
srv.startAPIServer()
return nil
}
diff --git a/store/engine/engine.go b/store/engine/engine.go
index 5052bd0..7a264af 100644
--- a/store/engine/engine.go
+++ b/store/engine/engine.go
@@ -92,11 +92,23 @@ func (m *Mock) Delete(ctx context.Context, key string)
error {
func (m *Mock) List(ctx context.Context, prefix string) ([]Entry, error) {
m.mu.Lock()
defer m.mu.Unlock()
+
+ exists := make(map[string]bool, 0)
var entries []Entry
for k, v := range m.values {
- if k == prefix || (len(k) > len(prefix) && k[len(prefix)] ==
'/') {
+ if strings.HasPrefix(k, prefix) {
+ k = strings.Trim(strings.TrimPrefix(k, prefix), "/")
+ fields := strings.SplitN(k, "/", 2)
+ if len(fields) == 2 {
+ // only list the first level
+ k = fields[0]
+ }
+ if _, ok := exists[k]; ok {
+ continue
+ }
+ exists[k] = true
entries = append(entries, Entry{
- Key: strings.TrimPrefix(k, prefix+"/"),
+ Key: k,
Value: []byte(v),
})
}