This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 0032e54  Fix wrongly check if the migrating slot in the source node 
(#269)
0032e54 is described below

commit 0032e5418d942fc6799a14e24e7c8e17aa0630c5
Author: hulk <[email protected]>
AuthorDate: Thu Feb 20 09:32:19 2025 +0800

    Fix wrongly check if the migrating slot in the source node (#269)
    
    * Fix wrongly check if the migrating slot in the source node
    
     Kvrocks will return `migrating_slot(s)` instead of `migrating_slot`
     after the slot range migration is supported. From the controller, we
     need to allow both of them to be compatiable with the old behavior.
    
    * Fix data race
---
 controller/cluster.go      |  8 ++++++--
 server/api/cluster_test.go | 46 +++++++++++++++++++++++++++++++++++-----------
 store/cluster_node.go      |  3 ++-
 3 files changed, 43 insertions(+), 14 deletions(-)

diff --git a/controller/cluster.go b/controller/cluster.go
index eea8ff8..1b642da 100644
--- a/controller/cluster.go
+++ b/controller/cluster.go
@@ -288,7 +288,10 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx 
context.Context, clonedClu
                        return
                }
                if sourceNodeClusterInfo.MigratingSlot != shard.MigratingSlot {
-                       log.Error("Mismatch migrate slot", zap.Int("slot", 
shard.MigratingSlot))
+                       log.Error("Mismatch migrating slot",
+                               zap.Int("source_migrating_slot", 
sourceNodeClusterInfo.MigratingSlot),
+                               zap.Int("migrating_slot", shard.MigratingSlot),
+                       )
                        return
                }
                if shard.TargetShardIndex < 0 || shard.TargetShardIndex >= 
len(clonedCluster.Shards) {
@@ -301,13 +304,14 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx 
context.Context, clonedClu
                case "none", "start":
                        continue
                case "fail":
+                       migratingSlot := shard.MigratingSlot
                        clonedCluster.Shards[i].ClearMigrateState()
                        if err := c.clusterStore.UpdateCluster(ctx, 
c.namespace, clonedCluster); err != nil {
                                log.Error("Failed to update the cluster", 
zap.Error(err))
                                return
                        }
                        c.updateCluster(clonedCluster)
-                       log.Warn("Failed to migrate the slot", zap.Int("slot", 
shard.MigratingSlot))
+                       log.Warn("Failed to migrate the slot", zap.Int("slot", 
migratingSlot))
                case "success":
                        err := clonedCluster.SetSlot(ctx, shard.MigratingSlot, 
targetMasterNode.ID())
                        if err != nil {
diff --git a/server/api/cluster_test.go b/server/api/cluster_test.go
index 78f5d42..53995ba 100644
--- a/server/api/cluster_test.go
+++ b/server/api/cluster_test.go
@@ -27,11 +27,14 @@ import (
        "net/http"
        "net/http/httptest"
        "testing"
+       "time"
 
        "github.com/gin-gonic/gin"
        "github.com/stretchr/testify/require"
 
+       "github.com/apache/kvrocks-controller/config"
        "github.com/apache/kvrocks-controller/consts"
+       "github.com/apache/kvrocks-controller/controller"
        "github.com/apache/kvrocks-controller/server/middleware"
        "github.com/apache/kvrocks-controller/store"
        "github.com/apache/kvrocks-controller/store/engine"
@@ -117,10 +120,12 @@ func TestClusterBasics(t *testing.T) {
        })
 
        t.Run("migrate slot only", func(t *testing.T) {
+               handler := &ClusterHandler{s: 
store.NewClusterStore(engine.NewMock())}
+               clusterName := "test-migrate-slot-only-cluster"
                recorder := httptest.NewRecorder()
                ctx := GetTestContext(recorder)
                ctx.Set(consts.ContextKeyStore, handler.s)
-               ctx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key: 
"cluster", Value: "test-cluster"}}
+               ctx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key: 
"cluster", Value: clusterName}}
                testMigrateReq := &MigrateSlotRequest{
                        Slot:     3,
                        SlotOnly: true,
@@ -130,18 +135,19 @@ func TestClusterBasics(t *testing.T) {
                require.NoError(t, err)
                ctx.Request.Body = io.NopCloser(bytes.NewBuffer(body))
 
-               before, err := handler.s.GetCluster(ctx, ns, "test-cluster")
+               cluster, err := store.NewCluster(clusterName, 
[]string{"127.0.0.1:1111", "127.0.0.1:2222"}, 1)
+               require.NoError(t, err)
+               require.NoError(t, handler.s.CreateCluster(ctx, ns, cluster))
+
+               before, err := handler.s.GetCluster(ctx, ns, clusterName)
                require.NoError(t, err)
                require.EqualValues(t, store.SlotRange{Start: 0, Stop: 8191}, 
before.Shards[0].SlotRanges[0])
                require.EqualValues(t, store.SlotRange{Start: 8192, Stop: 
store.MaxSlotID}, before.Shards[1].SlotRanges[0])
 
                middleware.RequiredCluster(ctx)
-               if recorder.Code != http.StatusOK {
-                       return
-               }
                handler.MigrateSlot(ctx)
                require.Equal(t, http.StatusOK, recorder.Code)
-               after, err := handler.s.GetCluster(ctx, ns, "test-cluster")
+               after, err := handler.s.GetCluster(ctx, ns, clusterName)
                require.NoError(t, err)
 
                require.EqualValues(t, before.Version.Add(1), 
after.Version.Load())
@@ -221,11 +227,12 @@ func TestClusterMigrateData(t *testing.T) {
                        require.NoError(t, node.SyncClusterInfo(ctx, cluster))
                }
        }
+       handler.s.CreateCluster(ctx, ns, cluster)
 
        recorder := httptest.NewRecorder()
        reqCtx := GetTestContext(recorder)
        reqCtx.Set(consts.ContextKeyStore, handler.s)
-       reqCtx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key: 
"cluster", Value: "test-cluster"}}
+       reqCtx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key: 
"cluster", Value: clusterName}}
        testMigrateReq := &MigrateSlotRequest{
                Slot:   0,
                Target: 1,
@@ -234,16 +241,33 @@ func TestClusterMigrateData(t *testing.T) {
        require.NoError(t, err)
        reqCtx.Request.Body = io.NopCloser(bytes.NewBuffer(body))
        middleware.RequiredCluster(reqCtx)
-       if recorder.Code != http.StatusOK {
-               return
-       }
        handler.MigrateSlot(reqCtx)
        require.Equal(t, http.StatusOK, recorder.Code)
 
-       gotCluster, err := handler.s.GetCluster(ctx, ns, "test-cluster")
+       gotCluster, err := handler.s.GetCluster(ctx, ns, clusterName)
        require.NoError(t, err)
        require.EqualValues(t, 1, gotCluster.Version.Load())
        require.Len(t, gotCluster.Shards[0].SlotRanges, 1)
        require.EqualValues(t, 0, gotCluster.Shards[0].MigratingSlot)
        require.EqualValues(t, 1, gotCluster.Shards[0].TargetShardIndex)
+
+       ctrl, err := controller.New(handler.s.(*store.ClusterStore), 
&config.ControllerConfig{
+               FailOver: &config.FailOverConfig{
+                       PingIntervalSeconds: 1,
+                       MaxPingCount:        3,
+               }})
+       require.NoError(t, err)
+       require.NoError(t, ctrl.Start(ctx))
+       ctrl.WaitForReady()
+       defer ctrl.Close()
+
+       // Migration will be failed due to the source node cannot connect to 
the target node,
+       // we just use it to confirm if the migration loop took effected.
+       require.Eventually(t, func() bool {
+               gotCluster, err := handler.s.GetCluster(ctx, ns, "test-cluster")
+               if err != nil {
+                       return false
+               }
+               return gotCluster.Shards[0].MigratingSlot == -1
+       }, 10*time.Second, 100*time.Millisecond)
 }
diff --git a/store/cluster_node.go b/store/cluster_node.go
index 177f015..33274a5 100644
--- a/store/cluster_node.go
+++ b/store/cluster_node.go
@@ -191,7 +191,8 @@ func (n *ClusterNode) GetClusterInfo(ctx context.Context) 
(*ClusterInfo, error)
                        if err != nil {
                                return nil, err
                        }
-               case "migrating_slot":
+               case "migrating_slot", "migrating_slot(s)":
+                       // TODO(@git-hulk): handle multiple migrating slots
                        clusterInfo.MigratingSlot, err = strconv.Atoi(fields[1])
                        if err != nil {
                                return nil, err

Reply via email to