This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new 0032e54 Fix wrongly check if the migrating slot in the source node
(#269)
0032e54 is described below
commit 0032e5418d942fc6799a14e24e7c8e17aa0630c5
Author: hulk <[email protected]>
AuthorDate: Thu Feb 20 09:32:19 2025 +0800
Fix wrongly check if the migrating slot in the source node (#269)
* Fix wrongly check if the migrating slot in the source node
Kvrocks will return `migrating_slot(s)` instead of `migrating_slot`
after the slot range migration is supported. From the controller, we
need to allow both of them to be compatiable with the old behavior.
* Fix data race
---
controller/cluster.go | 8 ++++++--
server/api/cluster_test.go | 46 +++++++++++++++++++++++++++++++++++-----------
store/cluster_node.go | 3 ++-
3 files changed, 43 insertions(+), 14 deletions(-)
diff --git a/controller/cluster.go b/controller/cluster.go
index eea8ff8..1b642da 100644
--- a/controller/cluster.go
+++ b/controller/cluster.go
@@ -288,7 +288,10 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx
context.Context, clonedClu
return
}
if sourceNodeClusterInfo.MigratingSlot != shard.MigratingSlot {
- log.Error("Mismatch migrate slot", zap.Int("slot",
shard.MigratingSlot))
+ log.Error("Mismatch migrating slot",
+ zap.Int("source_migrating_slot",
sourceNodeClusterInfo.MigratingSlot),
+ zap.Int("migrating_slot", shard.MigratingSlot),
+ )
return
}
if shard.TargetShardIndex < 0 || shard.TargetShardIndex >=
len(clonedCluster.Shards) {
@@ -301,13 +304,14 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx
context.Context, clonedClu
case "none", "start":
continue
case "fail":
+ migratingSlot := shard.MigratingSlot
clonedCluster.Shards[i].ClearMigrateState()
if err := c.clusterStore.UpdateCluster(ctx,
c.namespace, clonedCluster); err != nil {
log.Error("Failed to update the cluster",
zap.Error(err))
return
}
c.updateCluster(clonedCluster)
- log.Warn("Failed to migrate the slot", zap.Int("slot",
shard.MigratingSlot))
+ log.Warn("Failed to migrate the slot", zap.Int("slot",
migratingSlot))
case "success":
err := clonedCluster.SetSlot(ctx, shard.MigratingSlot,
targetMasterNode.ID())
if err != nil {
diff --git a/server/api/cluster_test.go b/server/api/cluster_test.go
index 78f5d42..53995ba 100644
--- a/server/api/cluster_test.go
+++ b/server/api/cluster_test.go
@@ -27,11 +27,14 @@ import (
"net/http"
"net/http/httptest"
"testing"
+ "time"
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/require"
+ "github.com/apache/kvrocks-controller/config"
"github.com/apache/kvrocks-controller/consts"
+ "github.com/apache/kvrocks-controller/controller"
"github.com/apache/kvrocks-controller/server/middleware"
"github.com/apache/kvrocks-controller/store"
"github.com/apache/kvrocks-controller/store/engine"
@@ -117,10 +120,12 @@ func TestClusterBasics(t *testing.T) {
})
t.Run("migrate slot only", func(t *testing.T) {
+ handler := &ClusterHandler{s:
store.NewClusterStore(engine.NewMock())}
+ clusterName := "test-migrate-slot-only-cluster"
recorder := httptest.NewRecorder()
ctx := GetTestContext(recorder)
ctx.Set(consts.ContextKeyStore, handler.s)
- ctx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key:
"cluster", Value: "test-cluster"}}
+ ctx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key:
"cluster", Value: clusterName}}
testMigrateReq := &MigrateSlotRequest{
Slot: 3,
SlotOnly: true,
@@ -130,18 +135,19 @@ func TestClusterBasics(t *testing.T) {
require.NoError(t, err)
ctx.Request.Body = io.NopCloser(bytes.NewBuffer(body))
- before, err := handler.s.GetCluster(ctx, ns, "test-cluster")
+ cluster, err := store.NewCluster(clusterName,
[]string{"127.0.0.1:1111", "127.0.0.1:2222"}, 1)
+ require.NoError(t, err)
+ require.NoError(t, handler.s.CreateCluster(ctx, ns, cluster))
+
+ before, err := handler.s.GetCluster(ctx, ns, clusterName)
require.NoError(t, err)
require.EqualValues(t, store.SlotRange{Start: 0, Stop: 8191},
before.Shards[0].SlotRanges[0])
require.EqualValues(t, store.SlotRange{Start: 8192, Stop:
store.MaxSlotID}, before.Shards[1].SlotRanges[0])
middleware.RequiredCluster(ctx)
- if recorder.Code != http.StatusOK {
- return
- }
handler.MigrateSlot(ctx)
require.Equal(t, http.StatusOK, recorder.Code)
- after, err := handler.s.GetCluster(ctx, ns, "test-cluster")
+ after, err := handler.s.GetCluster(ctx, ns, clusterName)
require.NoError(t, err)
require.EqualValues(t, before.Version.Add(1),
after.Version.Load())
@@ -221,11 +227,12 @@ func TestClusterMigrateData(t *testing.T) {
require.NoError(t, node.SyncClusterInfo(ctx, cluster))
}
}
+ handler.s.CreateCluster(ctx, ns, cluster)
recorder := httptest.NewRecorder()
reqCtx := GetTestContext(recorder)
reqCtx.Set(consts.ContextKeyStore, handler.s)
- reqCtx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key:
"cluster", Value: "test-cluster"}}
+ reqCtx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key:
"cluster", Value: clusterName}}
testMigrateReq := &MigrateSlotRequest{
Slot: 0,
Target: 1,
@@ -234,16 +241,33 @@ func TestClusterMigrateData(t *testing.T) {
require.NoError(t, err)
reqCtx.Request.Body = io.NopCloser(bytes.NewBuffer(body))
middleware.RequiredCluster(reqCtx)
- if recorder.Code != http.StatusOK {
- return
- }
handler.MigrateSlot(reqCtx)
require.Equal(t, http.StatusOK, recorder.Code)
- gotCluster, err := handler.s.GetCluster(ctx, ns, "test-cluster")
+ gotCluster, err := handler.s.GetCluster(ctx, ns, clusterName)
require.NoError(t, err)
require.EqualValues(t, 1, gotCluster.Version.Load())
require.Len(t, gotCluster.Shards[0].SlotRanges, 1)
require.EqualValues(t, 0, gotCluster.Shards[0].MigratingSlot)
require.EqualValues(t, 1, gotCluster.Shards[0].TargetShardIndex)
+
+ ctrl, err := controller.New(handler.s.(*store.ClusterStore),
&config.ControllerConfig{
+ FailOver: &config.FailOverConfig{
+ PingIntervalSeconds: 1,
+ MaxPingCount: 3,
+ }})
+ require.NoError(t, err)
+ require.NoError(t, ctrl.Start(ctx))
+ ctrl.WaitForReady()
+ defer ctrl.Close()
+
+ // Migration will be failed due to the source node cannot connect to
the target node,
+ // we just use it to confirm if the migration loop took effected.
+ require.Eventually(t, func() bool {
+ gotCluster, err := handler.s.GetCluster(ctx, ns, "test-cluster")
+ if err != nil {
+ return false
+ }
+ return gotCluster.Shards[0].MigratingSlot == -1
+ }, 10*time.Second, 100*time.Millisecond)
}
diff --git a/store/cluster_node.go b/store/cluster_node.go
index 177f015..33274a5 100644
--- a/store/cluster_node.go
+++ b/store/cluster_node.go
@@ -191,7 +191,8 @@ func (n *ClusterNode) GetClusterInfo(ctx context.Context)
(*ClusterInfo, error)
if err != nil {
return nil, err
}
- case "migrating_slot":
+ case "migrating_slot", "migrating_slot(s)":
+ // TODO(@git-hulk): handle multiple migrating slots
clusterInfo.MigratingSlot, err = strconv.Atoi(fields[1])
if err != nil {
return nil, err