This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new f848508 Fix should update cluster information to the remote store
first (#237)
f848508 is described below
commit f84850816bb4d18193854ec32a31f55e6c00cc5f
Author: hulk <[email protected]>
AuthorDate: Mon Dec 23 23:12:28 2024 +0800
Fix should update cluster information to the remote store first (#237)
Currently, the cluster migration will update the local cluster before
the remote store, so it might cause the local cluster to stop checking the
cluster migration even if it failed to set the remote store.
---
controller/cluster.go | 45 +++++++++++++++++++++++++++------------------
store/cluster.go | 12 ++++++++++++
store/cluster_shard.go | 11 +++++++++++
3 files changed, 50 insertions(+), 18 deletions(-)
diff --git a/controller/cluster.go b/controller/cluster.go
index 4f44953..2b975d2 100644
--- a/controller/cluster.go
+++ b/controller/cluster.go
@@ -266,12 +266,18 @@ func (c *ClusterChecker) probeLoop() {
}
}
-func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, cluster
*store.Cluster) {
+func (c *ClusterChecker) updateCluster(cluster *store.Cluster) {
+ c.clusterMu.Lock()
+ c.cluster = cluster
+ c.clusterMu.Unlock()
+}
+
+func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context,
clonedCluster *store.Cluster) {
log := logger.Get().With(
zap.String("namespace", c.namespace),
zap.String("cluster", c.clusterName))
- for i, shard := range cluster.Shards {
+ for i, shard := range clonedCluster.Shards {
if !shard.IsMigrating() {
continue
}
@@ -283,38 +289,41 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx
context.Context, cluster *
}
if sourceNodeClusterInfo.MigratingSlot != shard.MigratingSlot {
log.Error("Mismatch migrate slot", zap.Int("slot",
shard.MigratingSlot))
+ return
}
- if shard.TargetShardIndex < 0 || shard.TargetShardIndex >=
len(cluster.Shards) {
+ if shard.TargetShardIndex < 0 || shard.TargetShardIndex >=
len(clonedCluster.Shards) {
log.Error("Invalid target shard index",
zap.Int("index", shard.TargetShardIndex))
+ return
}
- targetMasterNode :=
cluster.Shards[shard.TargetShardIndex].GetMasterNode()
+ targetMasterNode :=
clonedCluster.Shards[shard.TargetShardIndex].GetMasterNode()
switch sourceNodeClusterInfo.MigratingState {
case "none", "start":
continue
case "fail":
- c.clusterMu.Lock()
- cluster.Shards[i].ClearMigrateState()
- c.clusterMu.Unlock()
- if err := c.clusterStore.SetCluster(ctx, c.namespace,
cluster); err != nil {
- log.Error("Failed to clear the migrate state",
zap.Error(err))
+ clonedCluster.Shards[i].ClearMigrateState()
+ if err := c.clusterStore.UpdateCluster(ctx,
c.namespace, clonedCluster); err != nil {
+ log.Error("Failed to update the cluster",
zap.Error(err))
+ return
}
+ c.updateCluster(clonedCluster)
log.Warn("Failed to migrate the slot", zap.Int("slot",
shard.MigratingSlot))
case "success":
- err := cluster.SetSlot(ctx, shard.MigratingSlot,
targetMasterNode.ID())
+ err := clonedCluster.SetSlot(ctx, shard.MigratingSlot,
targetMasterNode.ID())
if err != nil {
log.Error("Failed to set the slot",
zap.Error(err))
return
}
- cluster.Shards[i].SlotRanges =
store.RemoveSlotFromSlotRanges(cluster.Shards[i].SlotRanges,
shard.MigratingSlot)
- cluster.Shards[shard.TargetShardIndex].SlotRanges =
store.AddSlotToSlotRanges(
-
cluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot)
- cluster.Shards[i].ClearMigrateState()
- if err := c.clusterStore.UpdateCluster(ctx,
c.namespace, cluster); err != nil {
+ clonedCluster.Shards[i].SlotRanges =
store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges,
shard.MigratingSlot)
+ clonedCluster.Shards[shard.TargetShardIndex].SlotRanges
= store.AddSlotToSlotRanges(
+
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot)
+ clonedCluster.Shards[i].ClearMigrateState()
+ if err := c.clusterStore.UpdateCluster(ctx,
c.namespace, clonedCluster); err != nil {
log.Error("Failed to update the cluster",
zap.Error(err))
} else {
log.Info("Migrate the slot successfully",
zap.Int("slot", shard.MigratingSlot))
}
+ c.updateCluster(clonedCluster)
default:
log.Error("Unknown migrating state",
zap.String("state", sourceNodeClusterInfo.MigratingState))
}
@@ -332,12 +341,12 @@ func (c *ClusterChecker) migrationLoop() {
return
case <-ticker.C:
c.clusterMu.Lock()
- cluster := c.cluster
+ clonedCluster := c.cluster.Clone()
c.clusterMu.Unlock()
- if cluster == nil {
+ if clonedCluster == nil {
continue
}
- c.tryUpdateMigrationStatus(c.ctx, cluster)
+ c.tryUpdateMigrationStatus(c.ctx, clonedCluster)
}
}
}
diff --git a/store/cluster.go b/store/cluster.go
index d4beb06..d4713dd 100644
--- a/store/cluster.go
+++ b/store/cluster.go
@@ -77,6 +77,18 @@ func NewCluster(name string, nodes []string, replicas int)
(*Cluster, error) {
return cluster, nil
}
+func (cluster *Cluster) Clone() *Cluster {
+ clone := &Cluster{
+ Name: cluster.Name,
+ Shards: make([]*Shard, 0),
+ }
+ clone.Version.Store(cluster.Version.Load())
+ for _, shard := range cluster.Shards {
+ clone.Shards = append(clone.Shards, shard.Clone())
+ }
+ return clone
+}
+
// SetPassword will set the password for all nodes in the cluster.
func (cluster *Cluster) SetPassword(password string) {
for i := 0; i < len(cluster.Shards); i++ {
diff --git a/store/cluster_shard.go b/store/cluster_shard.go
index e15333b..1de4dc5 100644
--- a/store/cluster_shard.go
+++ b/store/cluster_shard.go
@@ -66,6 +66,17 @@ func NewShard() *Shard {
}
}
+func (shard *Shard) Clone() *Shard {
+ clone := NewShard()
+ clone.SlotRanges = make([]SlotRange, len(shard.SlotRanges))
+ copy(clone.SlotRanges, shard.SlotRanges)
+ clone.TargetShardIndex = shard.TargetShardIndex
+ clone.MigratingSlot = shard.MigratingSlot
+ clone.Nodes = make([]Node, len(shard.Nodes))
+ copy(clone.Nodes, shard.Nodes)
+ return clone
+}
+
func (shard *Shard) ClearMigrateState() {
shard.MigratingSlot = -1
shard.TargetShardIndex = -1