This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 618f338  Parallel send the probe request in the same cluster (#158)
618f338 is described below

commit 618f33879858a51c195a364b2c407c99c36dc067
Author: hulk <[email protected]>
AuthorDate: Wed Apr 10 08:42:31 2024 +0800

    Parallel send the probe request in the same cluster (#158)
---
 controller/failover/cluster.go |  47 +++++++-----
 controller/probe/cluster.go    | 158 ++++++++++++++++++++---------------------
 util/redis.go                  |   8 ++-
 3 files changed, 111 insertions(+), 102 deletions(-)

diff --git a/controller/failover/cluster.go b/controller/failover/cluster.go
index 28b0697..2100e07 100644
--- a/controller/failover/cluster.go
+++ b/controller/failover/cluster.go
@@ -23,6 +23,7 @@ import (
        "sync"
        "time"
 
+       "go.uber.org/atomic"
        "golang.org/x/net/context"
 
        "go.uber.org/zap"
@@ -49,31 +50,38 @@ type Cluster struct {
        tasks     map[string]*storage.FailoverTask
        tasksIdx  []string
 
-       quitCh    chan struct{}
-       closeOnce sync.Once
-       rw        sync.RWMutex
+       closed   atomic.Bool
+       ctx      context.Context
+       cancelFn context.CancelFunc
+
+       rw sync.RWMutex
+       wg sync.WaitGroup
 }
 
 // NewCluster return a Cluster instance and start schedule goroutine
 func NewCluster(ns, cluster string, stor *storage.Storage, cfg 
*config.FailOverConfig) *Cluster {
-       fn := &Cluster{
+       ctx, cancelFn := context.WithCancel(context.Background())
+       c := &Cluster{
                namespace: ns,
                cluster:   cluster,
                storage:   stor,
                tasks:     make(map[string]*storage.FailoverTask),
-               quitCh:    make(chan struct{}),
                config:    cfg,
+               ctx:       ctx,
+               cancelFn:  cancelFn,
        }
-       go fn.loop()
-       return fn
+
+       c.wg.Add(1)
+       go c.loop()
+       return c
 }
 
 // Close will release the resource when closing
-func (c *Cluster) Close() error {
-       c.closeOnce.Do(func() {
-               close(c.quitCh)
-       })
-       return nil
+func (c *Cluster) Close() {
+       if !c.closed.CAS(false, true) {
+               return
+       }
+       c.cancelFn()
 }
 
 func (c *Cluster) AddTask(task *storage.FailoverTask) error {
@@ -145,14 +153,17 @@ func (c *Cluster) purgeTasks() {
 }
 
 func (c *Cluster) loop() {
-       ctx := context.Background()
+       defer c.wg.Done()
+
        ticker := time.NewTicker(time.Duration(c.config.PingIntervalSeconds) * 
time.Second)
        defer ticker.Stop()
        for {
                select {
+               case <-c.ctx.Done():
+                       return
                case <-ticker.C:
                        c.rw.RLock()
-                       nodesCount, err := c.storage.ClusterNodesCounts(ctx, 
c.namespace, c.cluster)
+                       nodesCount, err := c.storage.ClusterNodesCounts(c.ctx, 
c.namespace, c.cluster)
                        if err != nil {
                                c.rw.RUnlock()
                                break
@@ -172,17 +183,15 @@ func (c *Cluster) loop() {
                                task := c.tasks[nodeAddr]
                                c.removeTask(idx)
                                if task.Type == ManualType {
-                                       c.promoteMaster(ctx, task)
+                                       c.promoteMaster(c.ctx, task)
                                        continue
                                }
-                               if err := util.PingCmd(ctx, &task.Node); err == 
nil {
+                               if err := util.PingCmd(c.ctx, &task.Node); err 
== nil {
                                        continue
                                }
-                               c.promoteMaster(ctx, task)
+                               c.promoteMaster(c.ctx, task)
                        }
                        c.rw.RUnlock()
-               case <-c.quitCh:
-                       return
                }
        }
 }
diff --git a/controller/probe/cluster.go b/controller/probe/cluster.go
index 3d073c2..6586383 100644
--- a/controller/probe/cluster.go
+++ b/controller/probe/cluster.go
@@ -22,6 +22,7 @@ package probe
 import (
        "context"
        "errors"
+       "sync"
        "time"
 
        "github.com/apache/kvrocks-controller/controller/failover"
@@ -42,6 +43,7 @@ type Cluster struct {
        cluster       string
        storage       *storage.Storage
        failOver      *failover.FailOver
+       failureMu     sync.Mutex
        failureCounts map[string]int64
        stopCh        chan struct{}
 }
@@ -61,93 +63,90 @@ func (c *Cluster) start() {
        go c.loop()
 }
 
-func (c *Cluster) probe(ctx context.Context, cluster *metadata.Cluster) 
(*metadata.Cluster, error) {
-       var latestEpoch int64
-       var latestNode *metadata.NodeInfo
+func (c *Cluster) probeNode(ctx context.Context, node *metadata.NodeInfo) 
(int64, error) {
+       info, err := util.ClusterInfoCmd(ctx, node)
+       if err != nil {
+               switch err.Error() {
+               case ErrRestoringBackUp.Error():
+                       // The node is restoring from backup, just skip it
+                       return -1, nil
+               case ErrClusterNotInitialized.Error():
+                       return -1, ErrClusterNotInitialized
+               default:
+                       return -1, err
+               }
+       }
+       return info.ClusterCurrentEpoch, nil
+}
 
-       password := ""
-       currentClusterStr, _ := cluster.ToSlotString()
-       for index, shard := range cluster.Shards {
-               for _, node := range shard.Nodes {
-                       logger := logger.Get().With(
-                               zap.String("id", node.ID),
-                               zap.String("role", node.Role),
-                               zap.String("addr", node.Addr),
-                       )
-                       // all nodes in the cluster should have the same 
password,
-                       // so we just use the first node's password
-                       if password == "" {
-                               password = node.Password
-                       }
-                       if _, ok := c.failureCounts[node.Addr]; !ok {
-                               c.failureCounts[node.Addr] = 0
-                       }
-                       info, err := util.ClusterInfoCmd(ctx, &node)
-                       if err != nil {
-                               if err.Error() == ErrRestoringBackUp.Error() {
-                                       continue
-                               }
-                               if err.Error() == 
ErrClusterNotInitialized.Error() {
-                                       // Maybe the node was restarted, just 
re-sync the cluster info
-                                       clusterStr, _ := cluster.ToSlotString()
-                                       err = util.SyncClusterInfo2Node(ctx, 
&node, clusterStr, cluster.Version)
-                                       if err != nil {
-                                               
logger.With(zap.Error(err)).Warn("Failed to re-sync the cluster info")
-                                       }
-                                       continue
-                               }
-                               c.failureCounts[node.Addr] += 1
-                               if 
c.failureCounts[node.Addr]%c.failOver.Config().MaxPingCount == 0 {
-                                       err = c.failOver.AddNode(c.namespace, 
c.cluster, index, node, failover.AutoType)
-                                       logger.With(zap.Error(err)).Warn("Add 
the node into the fail over candidates")
-                               } else {
-                                       logger.With(
-                                               zap.Error(err),
-                                               zap.Int64("failure_count", 
c.failureCounts[node.Addr]),
-                                       ).Warn("Failed to ping the node")
-                               }
-                               continue
-                       }
-                       if info.ClusterCurrentEpoch < cluster.Version {
-                               err := util.SyncClusterInfo2Node(ctx, &node, 
currentClusterStr, cluster.Version)
-                               if err != nil {
-                                       logger.With(
-                                               zap.Error(err),
-                                               zap.Int64("cluster_version", 
cluster.Version),
-                                               zap.Int64("node_version", 
info.ClusterCurrentEpoch),
-                                       ).Info("Failed to sync the cluster 
info")
-                               }
-                       }
+func (c *Cluster) increaseFailureCount(index int, node *metadata.NodeInfo) {
+       log := logger.Get().With(
+               zap.String("id", node.ID),
+               zap.String("role", node.Role),
+               zap.String("addr", node.Addr),
+       )
 
-                       if info.ClusterMyEpoch > latestEpoch {
-                               latestEpoch = info.ClusterMyEpoch
-                               latestNode = &node
-                       }
-                       c.failureCounts[node.Addr] = 0
-               }
+       c.failureMu.Lock()
+       if _, ok := c.failureCounts[node.Addr]; !ok {
+               c.failureCounts[node.Addr] = 0
        }
+       c.failureCounts[node.Addr] += 1
+       count := c.failureCounts[node.Addr]
+       c.failureMu.Unlock()
 
-       if latestEpoch > cluster.Version {
-               latestClusterStr, err := util.ClusterNodesCmd(ctx, latestNode)
+       if count%c.failOver.Config().MaxPingCount == 0 {
+               err := c.failOver.AddNode(c.namespace, c.cluster, index, *node, 
failover.AutoType)
                if err != nil {
-                       return nil, err
-               }
-               latestClusterInfo, err := 
metadata.ParseCluster(latestClusterStr)
-               if err != nil {
-                       return nil, err
+                       log.With(zap.Error(err)).Warn("Failed to add the node 
into the fail over candidates")
+                       return
                }
-               latestClusterInfo.SetPassword(password)
-               err = c.storage.UpdateCluster(ctx, c.namespace, 
latestClusterInfo)
-               if err != nil {
-                       return nil, err
+               log.Info("Add the node into the fail over candidates")
+       }
+}
+
+func (c *Cluster) resetFailureCount(node *metadata.NodeInfo) {
+       c.failureMu.Lock()
+       delete(c.failureCounts, node.Addr)
+       c.failureMu.Unlock()
+}
+
+func (c *Cluster) probe(ctx context.Context, cluster *metadata.Cluster) {
+       for i, shard := range cluster.Shards {
+               for _, node := range shard.Nodes {
+                       go func(shardIdx int, node metadata.NodeInfo) {
+                               log := logger.Get().With(
+                                       zap.String("id", node.ID),
+                                       zap.String("role", node.Role),
+                                       zap.String("addr", node.Addr),
+                               )
+                               version, err := c.probeNode(ctx, &node)
+                               if err != nil && !errors.Is(err, 
ErrClusterNotInitialized) {
+                                       c.increaseFailureCount(shardIdx, &node)
+                                       log.With(zap.Error(err)).Error("Failed 
to probe the node")
+                                       return
+                               }
+                               log.Debug("Probe the cluster node ")
+
+                               if version < cluster.Version {
+                                       // sync the cluster to the latest 
version
+                                       err := util.SyncClusterInfo2Node(ctx, 
&node, cluster)
+                                       if err != nil {
+                                               
log.With(zap.Error(err)).Error("Failed to sync the cluster info")
+                                       }
+                               } else if version > cluster.Version {
+                                       log.With(
+                                               zap.Int64("node.version", 
version),
+                                               zap.Int64("cluster.version", 
cluster.Version),
+                                       ).Warn("The node is in a higher 
version")
+                               }
+                               c.resetFailureCount(&node)
+                       }(i, node)
                }
-               return latestClusterInfo, nil
        }
-       return cluster, nil
 }
 
 func (c *Cluster) loop() {
-       logger := logger.Get().With(
+       log := logger.Get().With(
                zap.String("namespace", c.namespace),
                zap.String("cluster", c.cluster),
        )
@@ -159,15 +158,12 @@ func (c *Cluster) loop() {
                case <-probeTicker.C:
                        clusterInfo, err := c.storage.GetClusterInfo(ctx, 
c.namespace, c.cluster)
                        if err != nil {
-                               logger.With(
+                               log.With(
                                        zap.Error(err),
                                ).Error("Failed to get the cluster info from 
the storage")
                                break
                        }
-                       if _, err := c.probe(ctx, clusterInfo); err != nil {
-                               logger.With(zap.Error(err)).Error("Failed to 
probe the cluster")
-                               break
-                       }
+                       c.probe(ctx, clusterInfo)
                case <-c.stopCh:
                        return
                }
diff --git a/util/redis.go b/util/redis.go
index b90b528..12045a0 100644
--- a/util/redis.go
+++ b/util/redis.go
@@ -370,7 +370,11 @@ func NodeInfoCmd(ctx context.Context, node 
*metadata.NodeInfo) (*NodeInfo, error
        return nodeInfo, nil
 }
 
-func SyncClusterInfo2Node(ctx context.Context, node *metadata.NodeInfo, 
clusterStr string, ver int64) error {
+func SyncClusterInfo2Node(ctx context.Context, node *metadata.NodeInfo, 
cluster *metadata.Cluster) error {
+       clusterStr, err := cluster.ToSlotString()
+       if err != nil {
+               return err
+       }
        cli, err := GetRedisClient(ctx, node)
        if err != nil {
                return err
@@ -379,7 +383,7 @@ func SyncClusterInfo2Node(ctx context.Context, node 
*metadata.NodeInfo, clusterS
        if err != nil {
                return err
        }
-       err = cli.Do(ctx, "CLUSTERX", "setnodes", clusterStr, ver).Err()
+       err = cli.Do(ctx, "CLUSTERX", "setnodes", clusterStr, 
cluster.Version).Err()
        if err != nil {
                return err
        }

Reply via email to