This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new d7d264f Fix sourceNodeClusterInfo.MigratingSlot maybe nil when
tryUpdateMigrationStatus (#357)
d7d264f is described below
commit d7d264f29b87acc7c755fef64801eb5b40ad09f6
Author: Lele Huang <[email protected]>
AuthorDate: Sat Oct 4 15:39:41 2025 +0800
Fix sourceNodeClusterInfo.MigratingSlot maybe nil when
tryUpdateMigrationStatus (#357)
---
controller/cluster.go | 27 ++++++++++++++++++---------
1 file changed, 18 insertions(+), 9 deletions(-)
diff --git a/controller/cluster.go b/controller/cluster.go
index 5484311..752ec60 100755
--- a/controller/cluster.go
+++ b/controller/cluster.go
@@ -338,48 +338,57 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx
context.Context, clonedClu
).Error("Failed to get the cluster info from the source
node", zap.Error(err))
continue
}
- if
!sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot.SlotRange) {
+
+ // If there is no migration information on the source node or
the source node migration slot is not equal to the shard,
+ // you need to clear the migration information on the
controller.
+ if sourceNodeClusterInfo.MigratingSlot == nil ||
(sourceNodeClusterInfo.MigratingSlot != nil &&
+
!sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot.SlotRange)) {
log.Error("Mismatch migrating slot",
zap.Int("shard_index", i),
- zap.String("source_migrating_slot",
sourceNodeClusterInfo.MigratingSlot.String()),
zap.String("migrating_slot",
shard.MigratingSlot.String()),
)
+ clonedCluster.Shards[i].ClearMigrateState()
+ if err = c.clusterStore.UpdateCluster(ctx, c.namespace,
clonedCluster); err != nil {
+ log.Error("Failed to update the migrate state
by UpdateCluster method", zap.Error(err))
+ return
+ }
+ c.updateCluster(clonedCluster)
continue
}
+
if shard.TargetShardIndex < 0 || shard.TargetShardIndex >=
len(clonedCluster.Shards) {
log.Error("Invalid target shard index",
zap.Int("index", shard.TargetShardIndex))
return
}
+ migratingSlot := shard.MigratingSlot.String()
switch sourceNodeClusterInfo.MigratingState {
case "none", "start":
continue
case "fail":
- migratingSlot := shard.MigratingSlot
clonedCluster.Shards[i].ClearMigrateState()
- if err := c.clusterStore.UpdateCluster(ctx,
c.namespace, clonedCluster); err != nil {
+ if err = c.clusterStore.UpdateCluster(ctx, c.namespace,
clonedCluster); err != nil {
log.Error("Failed to update the cluster",
zap.Error(err))
return
}
c.updateCluster(clonedCluster)
- log.Warn("Failed to migrate the slot",
zap.String("slot", migratingSlot.String()))
+ log.Warn("Failed to migrate the slot",
zap.String("slot", migratingSlot))
case "success":
clonedCluster.Shards[i].SlotRanges =
store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges,
shard.MigratingSlot.SlotRange)
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges
= store.AddSlotToSlotRanges(
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges,
shard.MigratingSlot.SlotRange,
)
- migratedSlot := shard.MigratingSlot
clonedCluster.Shards[i].ClearMigrateState()
- if err := c.clusterStore.UpdateCluster(ctx,
c.namespace, clonedCluster); err != nil {
+ if err = c.clusterStore.UpdateCluster(ctx, c.namespace,
clonedCluster); err != nil {
log.Error("Failed to update the cluster",
zap.Error(err))
return
} else {
- log.Info("Migrate the slot successfully",
zap.String("slot", migratedSlot.String()))
+ log.Info("Migrate the slot successfully",
zap.String("slot", migratingSlot))
}
c.updateCluster(clonedCluster)
default:
clonedCluster.Shards[i].ClearMigrateState()
- if err := c.clusterStore.UpdateCluster(ctx,
c.namespace, clonedCluster); err != nil {
+ if err = c.clusterStore.UpdateCluster(ctx, c.namespace,
clonedCluster); err != nil {
log.Error("Failed to update the cluster",
zap.Error(err))
return
}