This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git


The following commit(s) were added to refs/heads/unstable by this push:
     new d7d264f  Fix sourceNodeClusterInfo.MigratingSlot maybe nil when 
tryUpdateMigrationStatus (#357)
d7d264f is described below

commit d7d264f29b87acc7c755fef64801eb5b40ad09f6
Author: Lele Huang <[email protected]>
AuthorDate: Sat Oct 4 15:39:41 2025 +0800

    Fix sourceNodeClusterInfo.MigratingSlot maybe nil when 
tryUpdateMigrationStatus (#357)
---
 controller/cluster.go | 27 ++++++++++++++++++---------
 1 file changed, 18 insertions(+), 9 deletions(-)

diff --git a/controller/cluster.go b/controller/cluster.go
index 5484311..752ec60 100755
--- a/controller/cluster.go
+++ b/controller/cluster.go
@@ -338,48 +338,57 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx 
context.Context, clonedClu
                        ).Error("Failed to get the cluster info from the source 
node", zap.Error(err))
                        continue
                }
-               if 
!sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot.SlotRange) {
+
+               // If there is no migration information on the source node or 
the source node migration slot is not equal to the shard,
+               // you need to clear the migration information on the 
controller.
+               if sourceNodeClusterInfo.MigratingSlot == nil || 
(sourceNodeClusterInfo.MigratingSlot != nil &&
+                       
!sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot.SlotRange)) {
                        log.Error("Mismatch migrating slot",
                                zap.Int("shard_index", i),
-                               zap.String("source_migrating_slot", 
sourceNodeClusterInfo.MigratingSlot.String()),
                                zap.String("migrating_slot", 
shard.MigratingSlot.String()),
                        )
+                       clonedCluster.Shards[i].ClearMigrateState()
+                       if err = c.clusterStore.UpdateCluster(ctx, c.namespace, 
clonedCluster); err != nil {
+                               log.Error("Failed to update the migrate state 
by UpdateCluster method", zap.Error(err))
+                               return
+                       }
+                       c.updateCluster(clonedCluster)
                        continue
                }
+
                if shard.TargetShardIndex < 0 || shard.TargetShardIndex >= 
len(clonedCluster.Shards) {
                        log.Error("Invalid target shard index", 
zap.Int("index", shard.TargetShardIndex))
                        return
                }
 
+               migratingSlot := shard.MigratingSlot.String()
                switch sourceNodeClusterInfo.MigratingState {
                case "none", "start":
                        continue
                case "fail":
-                       migratingSlot := shard.MigratingSlot
                        clonedCluster.Shards[i].ClearMigrateState()
-                       if err := c.clusterStore.UpdateCluster(ctx, 
c.namespace, clonedCluster); err != nil {
+                       if err = c.clusterStore.UpdateCluster(ctx, c.namespace, 
clonedCluster); err != nil {
                                log.Error("Failed to update the cluster", 
zap.Error(err))
                                return
                        }
                        c.updateCluster(clonedCluster)
-                       log.Warn("Failed to migrate the slot", 
zap.String("slot", migratingSlot.String()))
+                       log.Warn("Failed to migrate the slot", 
zap.String("slot", migratingSlot))
                case "success":
                        clonedCluster.Shards[i].SlotRanges = 
store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, 
shard.MigratingSlot.SlotRange)
                        clonedCluster.Shards[shard.TargetShardIndex].SlotRanges 
= store.AddSlotToSlotRanges(
                                
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, 
shard.MigratingSlot.SlotRange,
                        )
-                       migratedSlot := shard.MigratingSlot
                        clonedCluster.Shards[i].ClearMigrateState()
-                       if err := c.clusterStore.UpdateCluster(ctx, 
c.namespace, clonedCluster); err != nil {
+                       if err = c.clusterStore.UpdateCluster(ctx, c.namespace, 
clonedCluster); err != nil {
                                log.Error("Failed to update the cluster", 
zap.Error(err))
                                return
                        } else {
-                               log.Info("Migrate the slot successfully", 
zap.String("slot", migratedSlot.String()))
+                               log.Info("Migrate the slot successfully", 
zap.String("slot", migratingSlot))
                        }
                        c.updateCluster(clonedCluster)
                default:
                        clonedCluster.Shards[i].ClearMigrateState()
-                       if err := c.clusterStore.UpdateCluster(ctx, 
c.namespace, clonedCluster); err != nil {
+                       if err = c.clusterStore.UpdateCluster(ctx, c.namespace, 
clonedCluster); err != nil {
                                log.Error("Failed to update the cluster", 
zap.Error(err))
                                return
                        }

Reply via email to