This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new 292d84f Support the latest cluster info from the node which version
larger than local cluster version (#271)
292d84f is described below
commit 292d84ffe085c0c813777f713521e86e976bb6cc
Author: Raphael <[email protected]>
AuthorDate: Fri Mar 28 10:17:40 2025 +0800
Support the latest cluster info from the node which version larger than
local cluster version (#271)
---
controller/cluster.go | 39 ++++++++++++++++++++++++++++++++++++++-
store/cluster_node.go | 2 ++
2 files changed, 40 insertions(+), 1 deletion(-)
diff --git a/controller/cluster.go b/controller/cluster.go
index c613d60..15caf76 100644
--- a/controller/cluster.go
+++ b/controller/cluster.go
@@ -194,9 +194,16 @@ func (c *ClusterChecker) syncClusterToNodes(ctx
context.Context) error {
}
func (c *ClusterChecker) parallelProbeNodes(ctx context.Context, cluster
*store.Cluster) {
+ var mu sync.Mutex
+ var latestNodeVersion int64 = 0
+ var latestClusterNodesStr string
+ var wg sync.WaitGroup
+
for i, shard := range cluster.Shards {
for _, node := range shard.Nodes {
+ wg.Add(1)
go func(shardIdx int, n store.Node) {
+ defer wg.Done()
log := logger.Get().With(
zap.String("id", n.ID()),
zap.Bool("is_master", n.IsMaster()),
@@ -223,16 +230,46 @@ func (c *ClusterChecker) parallelProbeNodes(ctx
context.Context, cluster *store.
if err := n.SyncClusterInfo(ctx,
cluster); err != nil {
log.With(zap.Error(err)).Error("Failed to sync the clusterName info")
}
- } else if version > cluster.Version.Load() {
+ } else if version > clusterVersion {
log.With(
zap.Int64("node.version",
version),
zap.Int64("clusterName.version", clusterVersion),
).Warn("The node is in a higher
version")
+ mu.Lock()
+ if version > latestNodeVersion {
+ latestNodeVersion = version
+ clusterNodesStr, errX :=
n.GetClusterNodesString(ctx)
+ if errX != nil {
+
log.With(zap.String("node", n.ID()), zap.Error(errX)).Error("Failed to get the
cluster nodes info from node")
+ // set empty explicitly
+ latestClusterNodesStr =
""
+ } else {
+ latestClusterNodesStr =
clusterNodesStr
+ }
+ }
+ mu.Unlock()
}
c.resetFailureCount(n.ID())
}(i, node)
}
}
+
+ wg.Wait()
+ if latestNodeVersion > cluster.Version.Load() && latestClusterNodesStr
!= "" {
+ latestClusterInfo, err :=
store.ParseCluster(latestClusterNodesStr)
+ if err != nil {
+ logger.Get().With(zap.String("cluster",
latestClusterNodesStr), zap.Error(err)).Error("Failed to parse the cluster
info")
+ return
+ }
+ latestClusterInfo.Name = cluster.Name
+
latestClusterInfo.SetPassword(cluster.Shards[0].Nodes[0].Password())
+ err = c.clusterStore.UpdateCluster(ctx, c.namespace,
latestClusterInfo)
+ if err != nil {
+ logger.Get().With(zap.String("cluster",
latestClusterNodesStr), zap.Error(err)).Error("Failed to update the cluster
info")
+ return
+ }
+ logger.Get().With(zap.Any("latestClusterInfo",
latestClusterInfo)).Info("Refresh latest cluster info to all nodes")
+ }
}
func (c *ClusterChecker) probeLoop() {
diff --git a/store/cluster_node.go b/store/cluster_node.go
index 33274a5..3434595 100644
--- a/store/cluster_node.go
+++ b/store/cluster_node.go
@@ -72,6 +72,8 @@ type Node interface {
MarshalJSON() ([]byte, error)
UnmarshalJSON(data []byte) error
+
+ GetClusterNodesString(ctx context.Context) (string, error)
}
type ClusterNode struct {