This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 292d84f  Support the latest cluster info from the node which version 
larger than local cluster version (#271)
292d84f is described below

commit 292d84ffe085c0c813777f713521e86e976bb6cc
Author: Raphael <[email protected]>
AuthorDate: Fri Mar 28 10:17:40 2025 +0800

    Support the latest cluster info from the node which version larger than 
local cluster version (#271)
---
 controller/cluster.go | 39 ++++++++++++++++++++++++++++++++++++++-
 store/cluster_node.go |  2 ++
 2 files changed, 40 insertions(+), 1 deletion(-)

diff --git a/controller/cluster.go b/controller/cluster.go
index c613d60..15caf76 100644
--- a/controller/cluster.go
+++ b/controller/cluster.go
@@ -194,9 +194,16 @@ func (c *ClusterChecker) syncClusterToNodes(ctx 
context.Context) error {
 }
 
 func (c *ClusterChecker) parallelProbeNodes(ctx context.Context, cluster 
*store.Cluster) {
+       var mu sync.Mutex
+       var latestNodeVersion int64 = 0
+       var latestClusterNodesStr string
+       var wg sync.WaitGroup
+
        for i, shard := range cluster.Shards {
                for _, node := range shard.Nodes {
+                       wg.Add(1)
                        go func(shardIdx int, n store.Node) {
+                               defer wg.Done()
                                log := logger.Get().With(
                                        zap.String("id", n.ID()),
                                        zap.Bool("is_master", n.IsMaster()),
@@ -223,16 +230,46 @@ func (c *ClusterChecker) parallelProbeNodes(ctx 
context.Context, cluster *store.
                                        if err := n.SyncClusterInfo(ctx, 
cluster); err != nil {
                                                
log.With(zap.Error(err)).Error("Failed to sync the clusterName info")
                                        }
-                               } else if version > cluster.Version.Load() {
+                               } else if version > clusterVersion {
                                        log.With(
                                                zap.Int64("node.version", 
version),
                                                
zap.Int64("clusterName.version", clusterVersion),
                                        ).Warn("The node is in a higher 
version")
+                                       mu.Lock()
+                                       if version > latestNodeVersion {
+                                               latestNodeVersion = version
+                                               clusterNodesStr, errX := 
n.GetClusterNodesString(ctx)
+                                               if errX != nil {
+                                                       
log.With(zap.String("node", n.ID()), zap.Error(errX)).Error("Failed to get the 
cluster nodes info from node")
+                                                       // set empty explicitly
+                                                       latestClusterNodesStr = 
""
+                                               } else {
+                                                       latestClusterNodesStr = 
clusterNodesStr
+                                               }
+                                       }
+                                       mu.Unlock()
                                }
                                c.resetFailureCount(n.ID())
                        }(i, node)
                }
        }
+
+       wg.Wait()
+       if latestNodeVersion > cluster.Version.Load() && latestClusterNodesStr 
!= "" {
+               latestClusterInfo, err := 
store.ParseCluster(latestClusterNodesStr)
+               if err != nil {
+                       logger.Get().With(zap.String("cluster", 
latestClusterNodesStr), zap.Error(err)).Error("Failed to parse the cluster 
info")
+                       return
+               }
+               latestClusterInfo.Name = cluster.Name
+               
latestClusterInfo.SetPassword(cluster.Shards[0].Nodes[0].Password())
+               err = c.clusterStore.UpdateCluster(ctx, c.namespace, 
latestClusterInfo)
+               if err != nil {
+                       logger.Get().With(zap.String("cluster", 
latestClusterNodesStr), zap.Error(err)).Error("Failed to update the cluster 
info")
+                       return
+               }
+               logger.Get().With(zap.Any("latestClusterInfo", 
latestClusterInfo)).Info("Refresh latest cluster info to all nodes")
+       }
 }
 
 func (c *ClusterChecker) probeLoop() {
diff --git a/store/cluster_node.go b/store/cluster_node.go
index 33274a5..3434595 100644
--- a/store/cluster_node.go
+++ b/store/cluster_node.go
@@ -72,6 +72,8 @@ type Node interface {
 
        MarshalJSON() ([]byte, error)
        UnmarshalJSON(data []byte) error
+
+       GetClusterNodesString(ctx context.Context) (string, error)
 }
 
 type ClusterNode struct {

Reply via email to