This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new 618f338 Parallel send the probe request in the same cluster (#158)
618f338 is described below
commit 618f33879858a51c195a364b2c407c99c36dc067
Author: hulk <[email protected]>
AuthorDate: Wed Apr 10 08:42:31 2024 +0800
Parallel send the probe request in the same cluster (#158)
---
controller/failover/cluster.go | 47 +++++++-----
controller/probe/cluster.go | 158 ++++++++++++++++++++---------------------
util/redis.go | 8 ++-
3 files changed, 111 insertions(+), 102 deletions(-)
diff --git a/controller/failover/cluster.go b/controller/failover/cluster.go
index 28b0697..2100e07 100644
--- a/controller/failover/cluster.go
+++ b/controller/failover/cluster.go
@@ -23,6 +23,7 @@ import (
"sync"
"time"
+ "go.uber.org/atomic"
"golang.org/x/net/context"
"go.uber.org/zap"
@@ -49,31 +50,38 @@ type Cluster struct {
tasks map[string]*storage.FailoverTask
tasksIdx []string
- quitCh chan struct{}
- closeOnce sync.Once
- rw sync.RWMutex
+ closed atomic.Bool
+ ctx context.Context
+ cancelFn context.CancelFunc
+
+ rw sync.RWMutex
+ wg sync.WaitGroup
}
// NewCluster return a Cluster instance and start schedule goroutine
func NewCluster(ns, cluster string, stor *storage.Storage, cfg
*config.FailOverConfig) *Cluster {
- fn := &Cluster{
+ ctx, cancelFn := context.WithCancel(context.Background())
+ c := &Cluster{
namespace: ns,
cluster: cluster,
storage: stor,
tasks: make(map[string]*storage.FailoverTask),
- quitCh: make(chan struct{}),
config: cfg,
+ ctx: ctx,
+ cancelFn: cancelFn,
}
- go fn.loop()
- return fn
+
+ c.wg.Add(1)
+ go c.loop()
+ return c
}
// Close will release the resource when closing
-func (c *Cluster) Close() error {
- c.closeOnce.Do(func() {
- close(c.quitCh)
- })
- return nil
+func (c *Cluster) Close() {
+ if !c.closed.CAS(false, true) {
+ return
+ }
+ c.cancelFn()
}
func (c *Cluster) AddTask(task *storage.FailoverTask) error {
@@ -145,14 +153,17 @@ func (c *Cluster) purgeTasks() {
}
func (c *Cluster) loop() {
- ctx := context.Background()
+ defer c.wg.Done()
+
ticker := time.NewTicker(time.Duration(c.config.PingIntervalSeconds) *
time.Second)
defer ticker.Stop()
for {
select {
+ case <-c.ctx.Done():
+ return
case <-ticker.C:
c.rw.RLock()
- nodesCount, err := c.storage.ClusterNodesCounts(ctx,
c.namespace, c.cluster)
+ nodesCount, err := c.storage.ClusterNodesCounts(c.ctx,
c.namespace, c.cluster)
if err != nil {
c.rw.RUnlock()
break
@@ -172,17 +183,15 @@ func (c *Cluster) loop() {
task := c.tasks[nodeAddr]
c.removeTask(idx)
if task.Type == ManualType {
- c.promoteMaster(ctx, task)
+ c.promoteMaster(c.ctx, task)
continue
}
- if err := util.PingCmd(ctx, &task.Node); err ==
nil {
+ if err := util.PingCmd(c.ctx, &task.Node); err
== nil {
continue
}
- c.promoteMaster(ctx, task)
+ c.promoteMaster(c.ctx, task)
}
c.rw.RUnlock()
- case <-c.quitCh:
- return
}
}
}
diff --git a/controller/probe/cluster.go b/controller/probe/cluster.go
index 3d073c2..6586383 100644
--- a/controller/probe/cluster.go
+++ b/controller/probe/cluster.go
@@ -22,6 +22,7 @@ package probe
import (
"context"
"errors"
+ "sync"
"time"
"github.com/apache/kvrocks-controller/controller/failover"
@@ -42,6 +43,7 @@ type Cluster struct {
cluster string
storage *storage.Storage
failOver *failover.FailOver
+ failureMu sync.Mutex
failureCounts map[string]int64
stopCh chan struct{}
}
@@ -61,93 +63,90 @@ func (c *Cluster) start() {
go c.loop()
}
-func (c *Cluster) probe(ctx context.Context, cluster *metadata.Cluster)
(*metadata.Cluster, error) {
- var latestEpoch int64
- var latestNode *metadata.NodeInfo
+func (c *Cluster) probeNode(ctx context.Context, node *metadata.NodeInfo)
(int64, error) {
+ info, err := util.ClusterInfoCmd(ctx, node)
+ if err != nil {
+ switch err.Error() {
+ case ErrRestoringBackUp.Error():
+ // The node is restoring from backup, just skip it
+ return -1, nil
+ case ErrClusterNotInitialized.Error():
+ return -1, ErrClusterNotInitialized
+ default:
+ return -1, err
+ }
+ }
+ return info.ClusterCurrentEpoch, nil
+}
- password := ""
- currentClusterStr, _ := cluster.ToSlotString()
- for index, shard := range cluster.Shards {
- for _, node := range shard.Nodes {
- logger := logger.Get().With(
- zap.String("id", node.ID),
- zap.String("role", node.Role),
- zap.String("addr", node.Addr),
- )
- // all nodes in the cluster should have the same
password,
- // so we just use the first node's password
- if password == "" {
- password = node.Password
- }
- if _, ok := c.failureCounts[node.Addr]; !ok {
- c.failureCounts[node.Addr] = 0
- }
- info, err := util.ClusterInfoCmd(ctx, &node)
- if err != nil {
- if err.Error() == ErrRestoringBackUp.Error() {
- continue
- }
- if err.Error() ==
ErrClusterNotInitialized.Error() {
- // Maybe the node was restarted, just
re-sync the cluster info
- clusterStr, _ := cluster.ToSlotString()
- err = util.SyncClusterInfo2Node(ctx,
&node, clusterStr, cluster.Version)
- if err != nil {
-
logger.With(zap.Error(err)).Warn("Failed to re-sync the cluster info")
- }
- continue
- }
- c.failureCounts[node.Addr] += 1
- if
c.failureCounts[node.Addr]%c.failOver.Config().MaxPingCount == 0 {
- err = c.failOver.AddNode(c.namespace,
c.cluster, index, node, failover.AutoType)
- logger.With(zap.Error(err)).Warn("Add
the node into the fail over candidates")
- } else {
- logger.With(
- zap.Error(err),
- zap.Int64("failure_count",
c.failureCounts[node.Addr]),
- ).Warn("Failed to ping the node")
- }
- continue
- }
- if info.ClusterCurrentEpoch < cluster.Version {
- err := util.SyncClusterInfo2Node(ctx, &node,
currentClusterStr, cluster.Version)
- if err != nil {
- logger.With(
- zap.Error(err),
- zap.Int64("cluster_version",
cluster.Version),
- zap.Int64("node_version",
info.ClusterCurrentEpoch),
- ).Info("Failed to sync the cluster
info")
- }
- }
+func (c *Cluster) increaseFailureCount(index int, node *metadata.NodeInfo) {
+ log := logger.Get().With(
+ zap.String("id", node.ID),
+ zap.String("role", node.Role),
+ zap.String("addr", node.Addr),
+ )
- if info.ClusterMyEpoch > latestEpoch {
- latestEpoch = info.ClusterMyEpoch
- latestNode = &node
- }
- c.failureCounts[node.Addr] = 0
- }
+ c.failureMu.Lock()
+ if _, ok := c.failureCounts[node.Addr]; !ok {
+ c.failureCounts[node.Addr] = 0
}
+ c.failureCounts[node.Addr] += 1
+ count := c.failureCounts[node.Addr]
+ c.failureMu.Unlock()
- if latestEpoch > cluster.Version {
- latestClusterStr, err := util.ClusterNodesCmd(ctx, latestNode)
+ if count%c.failOver.Config().MaxPingCount == 0 {
+ err := c.failOver.AddNode(c.namespace, c.cluster, index, *node,
failover.AutoType)
if err != nil {
- return nil, err
- }
- latestClusterInfo, err :=
metadata.ParseCluster(latestClusterStr)
- if err != nil {
- return nil, err
+ log.With(zap.Error(err)).Warn("Failed to add the node
into the fail over candidates")
+ return
}
- latestClusterInfo.SetPassword(password)
- err = c.storage.UpdateCluster(ctx, c.namespace,
latestClusterInfo)
- if err != nil {
- return nil, err
+ log.Info("Add the node into the fail over candidates")
+ }
+}
+
+func (c *Cluster) resetFailureCount(node *metadata.NodeInfo) {
+ c.failureMu.Lock()
+ delete(c.failureCounts, node.Addr)
+ c.failureMu.Unlock()
+}
+
+func (c *Cluster) probe(ctx context.Context, cluster *metadata.Cluster) {
+ for i, shard := range cluster.Shards {
+ for _, node := range shard.Nodes {
+ go func(shardIdx int, node metadata.NodeInfo) {
+ log := logger.Get().With(
+ zap.String("id", node.ID),
+ zap.String("role", node.Role),
+ zap.String("addr", node.Addr),
+ )
+ version, err := c.probeNode(ctx, &node)
+ if err != nil && !errors.Is(err,
ErrClusterNotInitialized) {
+ c.increaseFailureCount(shardIdx, &node)
+ log.With(zap.Error(err)).Error("Failed
to probe the node")
+ return
+ }
+ log.Debug("Probe the cluster node ")
+
+ if version < cluster.Version {
+ // sync the cluster to the latest
version
+ err := util.SyncClusterInfo2Node(ctx,
&node, cluster)
+ if err != nil {
+
log.With(zap.Error(err)).Error("Failed to sync the cluster info")
+ }
+ } else if version > cluster.Version {
+ log.With(
+ zap.Int64("node.version",
version),
+ zap.Int64("cluster.version",
cluster.Version),
+ ).Warn("The node is in a higher
version")
+ }
+ c.resetFailureCount(&node)
+ }(i, node)
}
- return latestClusterInfo, nil
}
- return cluster, nil
}
func (c *Cluster) loop() {
- logger := logger.Get().With(
+ log := logger.Get().With(
zap.String("namespace", c.namespace),
zap.String("cluster", c.cluster),
)
@@ -159,15 +158,12 @@ func (c *Cluster) loop() {
case <-probeTicker.C:
clusterInfo, err := c.storage.GetClusterInfo(ctx,
c.namespace, c.cluster)
if err != nil {
- logger.With(
+ log.With(
zap.Error(err),
).Error("Failed to get the cluster info from
the storage")
break
}
- if _, err := c.probe(ctx, clusterInfo); err != nil {
- logger.With(zap.Error(err)).Error("Failed to
probe the cluster")
- break
- }
+ c.probe(ctx, clusterInfo)
case <-c.stopCh:
return
}
diff --git a/util/redis.go b/util/redis.go
index b90b528..12045a0 100644
--- a/util/redis.go
+++ b/util/redis.go
@@ -370,7 +370,11 @@ func NodeInfoCmd(ctx context.Context, node
*metadata.NodeInfo) (*NodeInfo, error
return nodeInfo, nil
}
-func SyncClusterInfo2Node(ctx context.Context, node *metadata.NodeInfo,
clusterStr string, ver int64) error {
+func SyncClusterInfo2Node(ctx context.Context, node *metadata.NodeInfo,
cluster *metadata.Cluster) error {
+ clusterStr, err := cluster.ToSlotString()
+ if err != nil {
+ return err
+ }
cli, err := GetRedisClient(ctx, node)
if err != nil {
return err
@@ -379,7 +383,7 @@ func SyncClusterInfo2Node(ctx context.Context, node
*metadata.NodeInfo, clusterS
if err != nil {
return err
}
- err = cli.Do(ctx, "CLUSTERX", "setnodes", clusterStr, ver).Err()
+ err = cli.Do(ctx, "CLUSTERX", "setnodes", clusterStr,
cluster.Version).Err()
if err != nil {
return err
}