This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new bb665f6f Fix should forbid importing the slot which belongs to itself
in cluster mode (#2209)
bb665f6f is described below
commit bb665f6fc2de7cb1215240866d48e92adfed43b1
Author: hulk <[email protected]>
AuthorDate: Sat Mar 30 22:58:10 2024 +0800
Fix should forbid importing the slot which belongs to itself in cluster
mode (#2209)
---
src/cluster/cluster.cc | 4 +
src/cluster/slot_import.cc | 4 +
.../integration/slotimport/slotimport_test.go | 96 ++++++++++++++--------
3 files changed, 68 insertions(+), 36 deletions(-)
diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index bb6c30de..686b63a0 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -318,6 +318,10 @@ Status Cluster::ImportSlot(redis::Connection *conn, int
slot, int state) {
if (!IsValidSlot(slot)) {
return {Status::NotOK, errSlotOutOfRange};
}
+ auto source_node = srv_->cluster->slots_nodes_[slot];
+ if (source_node && source_node->id == myid_) {
+ return {Status::NotOK, "Can't import slot which belongs to me"};
+ }
Status s;
switch (state) {
diff --git a/src/cluster/slot_import.cc b/src/cluster/slot_import.cc
index a3988e76..dcc97d66 100644
--- a/src/cluster/slot_import.cc
+++ b/src/cluster/slot_import.cc
@@ -31,6 +31,10 @@ SlotImport::SlotImport(Server *srv)
Status SlotImport::Start(int slot) {
std::lock_guard<std::mutex> guard(mutex_);
if (import_status_ == kImportStart) {
+ // return ok if the same slot is importing
+ if (import_slot_ == slot) {
+ return Status::OK();
+ }
return {Status::NotOK, fmt::format("only one importing slot is allowed,
current slot is: {}", import_slot_)};
}
diff --git a/tests/gocase/integration/slotimport/slotimport_test.go
b/tests/gocase/integration/slotimport/slotimport_test.go
index 9a1830fa..1d427b80 100644
--- a/tests/gocase/integration/slotimport/slotimport_test.go
+++ b/tests/gocase/integration/slotimport/slotimport_test.go
@@ -22,6 +22,7 @@ package slotimport
import (
"context"
"fmt"
+ "strings"
"testing"
"time"
@@ -61,23 +62,39 @@ func TestImportSlaveServer(t *testing.T) {
func TestImportedServer(t *testing.T) {
ctx := context.Background()
- srv := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
- defer func() { srv.Close() }()
- rdb := srv.NewClient()
- defer func() { require.NoError(t, rdb.Close()) }()
- srvID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
- clusterNodes := fmt.Sprintf("%s 127.0.0.1 %d master - 0-16383", srvID,
srv.Port())
- require.NoError(t, rdb.Do(ctx, "clusterx", "SETNODEID", srvID).Err())
- require.NoError(t, rdb.Do(ctx, "clusterx", "SETNODES", clusterNodes,
"1").Err())
+ srvA := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ defer func() { srvA.Close() }()
+ rdbA := srvA.NewClient()
+ defer func() { require.NoError(t, rdbA.Close()) }()
+ srvAID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+ require.NoError(t, rdbA.Do(ctx, "clusterx", "SETNODEID", srvAID).Err())
+
+ srvB := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ defer func() { srvB.Close() }()
+ rdbB := srvB.NewClient()
+ defer func() { require.NoError(t, rdbB.Close()) }()
+ srvBID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+ require.NoError(t, rdbB.Do(ctx, "clusterx", "SETNODEID", srvBID).Err())
+
+ clusterNodes := fmt.Sprintf("%s 127.0.0.1 %d master - 0-8191", srvAID,
srvA.Port())
+ clusterNodes = fmt.Sprintf("%s\n%s 127.0.0.1 %d master - 8192-16383",
clusterNodes, srvBID, srvB.Port())
+
+ require.NoError(t, rdbA.Do(ctx, "clusterx", "SETNODES", clusterNodes,
"1").Err())
+ require.NoError(t, rdbB.Do(ctx, "clusterx", "SETNODES", clusterNodes,
"1").Err())
t.Run("IMPORT - error slot", func(t *testing.T) {
- require.ErrorContains(t, rdb.Do(ctx, "cluster", "import", -1,
0).Err(), "Slot is out of range")
- require.ErrorContains(t, rdb.Do(ctx, "cluster", "import",
16384, 0).Err(), "Slot is out of range")
+ require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import", -1,
0).Err(), "Slot is out of range")
+ require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import",
16384, 0).Err(), "Slot is out of range")
})
t.Run("IMPORT - slot with error state", func(t *testing.T) {
- require.ErrorContains(t, rdb.Do(ctx, "cluster", "import", 1,
4).Err(), "Invalid import state")
- require.ErrorContains(t, rdb.Do(ctx, "cluster", "import", 1,
-3).Err(), "Invalid import state")
+ require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import", 1,
4).Err(), "Invalid import state")
+ require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import", 1,
-3).Err(), "Invalid import state")
+ })
+
+ t.Run("IMPORT - slot with wrong state", func(t *testing.T) {
+ require.Contains(t, rdbA.Do(ctx, "cluster", "import", 1,
0).Err(),
+ "Can't import slot which belongs to me")
})
t.Run("IMPORT - slot states in right order", func(t *testing.T) {
@@ -85,66 +102,73 @@ func TestImportedServer(t *testing.T) {
slotKey := util.SlotTable[slotNum]
// import start
- require.Equal(t, "OK", rdb.Do(ctx, "cluster", "import",
slotNum, 0).Val())
- require.NoError(t, rdb.Set(ctx, slotKey, "slot1", 0).Err())
- require.Equal(t, "slot1", rdb.Get(ctx, slotKey).Val())
- clusterInfo := rdb.ClusterInfo(ctx).Val()
+ require.NoError(t, rdbA.Set(ctx, slotKey, "slot1", 0).Err())
+ require.Equal(t, "slot1", rdbA.Get(ctx, slotKey).Val())
+ require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import",
slotNum, 0).Val())
+ clusterInfo := rdbB.ClusterInfo(ctx).Val()
require.Contains(t, clusterInfo, "importing_slot: 1")
require.Contains(t, clusterInfo, "import_state: start")
+ clusterNodes := rdbB.ClusterNodes(ctx).Val()
+ require.Contains(t, clusterNodes, fmt.Sprintf("[%d-<-%s]",
slotNum, srvAID))
+
+ require.NoError(t, rdbA.Do(ctx, "clusterx", "migrate", slotNum,
srvBID).Err())
+ require.Eventually(t, func() bool {
+ clusterInfo :=
rdbA.ClusterInfo(context.Background()).Val()
+ return strings.Contains(clusterInfo,
fmt.Sprintf("migrating_slot: %d", slotNum)) &&
+ strings.Contains(clusterInfo,
fmt.Sprintf("migrating_state: %s", "success"))
+ }, 5*time.Second, 100*time.Millisecond)
- clusterNodes := rdb.ClusterNodes(ctx).Val()
- require.Contains(t, clusterNodes, fmt.Sprintf("[%d-<-%s]",
slotNum, srvID))
// import success
- require.Equal(t, "OK", rdb.Do(ctx, "cluster", "import",
slotNum, 1).Val())
- clusterInfo = rdb.ClusterInfo(ctx).Val()
+ require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import",
slotNum, 1).Val())
+ clusterInfo = rdbB.ClusterInfo(ctx).Val()
require.Contains(t, clusterInfo, "importing_slot: 1")
require.Contains(t, clusterInfo, "import_state: success")
// import finish and should not contain the import section
- clusterNodes = rdb.ClusterNodes(ctx).Val()
- require.NotContains(t, clusterNodes, fmt.Sprintf("[%d-<-%s]",
slotNum, srvID))
+ clusterNodes = rdbB.ClusterNodes(ctx).Val()
+ require.NotContains(t, clusterNodes, fmt.Sprintf("[%d-<-%s]",
slotNum, srvAID))
time.Sleep(50 * time.Millisecond)
- require.Equal(t, "slot1", rdb.Get(ctx, slotKey).Val())
+ require.Equal(t, "slot1", rdbB.Get(ctx, slotKey).Val())
})
t.Run("IMPORT - slot state 'error'", func(t *testing.T) {
slotNum := 10
slotKey := util.SlotTable[slotNum]
- require.Equal(t, "OK", rdb.Do(ctx, "cluster", "import",
slotNum, 0).Val())
- require.NoError(t, rdb.Set(ctx, slotKey, "slot10_again",
0).Err())
- require.Equal(t, "slot10_again", rdb.Get(ctx, slotKey).Val())
+ require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import",
slotNum, 0).Val())
+ require.NoError(t, rdbB.Set(ctx, slotKey, "slot10_again",
0).Err())
+ require.Equal(t, "slot10_again", rdbB.Get(ctx, slotKey).Val())
// import error
- require.Equal(t, "OK", rdb.Do(ctx, "cluster", "import",
slotNum, 2).Val())
+ require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import",
slotNum, 2).Val())
time.Sleep(50 * time.Millisecond)
- clusterInfo := rdb.ClusterInfo(ctx).Val()
+ clusterInfo := rdbB.ClusterInfo(ctx).Val()
require.Contains(t, clusterInfo, "importing_slot: 10")
require.Contains(t, clusterInfo, "import_state: error")
// get empty
- require.Zero(t, rdb.Exists(ctx, slotKey).Val())
+ require.Zero(t, rdbB.Exists(ctx, slotKey).Val())
})
t.Run("IMPORT - connection broken", func(t *testing.T) {
slotNum := 11
slotKey := util.SlotTable[slotNum]
- require.Equal(t, "OK", rdb.Do(ctx, "cluster", "import",
slotNum, 0).Val())
- require.NoError(t, rdb.Set(ctx, slotKey, "slot11", 0).Err())
- require.Equal(t, "slot11", rdb.Get(ctx, slotKey).Val())
+ require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import",
slotNum, 0).Val())
+ require.NoError(t, rdbB.Set(ctx, slotKey, "slot11", 0).Err())
+ require.Equal(t, "slot11", rdbB.Get(ctx, slotKey).Val())
// close connection, server will stop importing
- require.NoError(t, rdb.Close())
- rdb = srv.NewClient()
+ require.NoError(t, rdbB.Close())
+ rdbB = srvB.NewClient()
time.Sleep(50 * time.Millisecond)
- clusterInfo := rdb.ClusterInfo(ctx).Val()
+ clusterInfo := rdbB.ClusterInfo(ctx).Val()
require.Contains(t, clusterInfo, "importing_slot: 11")
require.Contains(t, clusterInfo, "import_state: error")
// get empty
- require.Zero(t, rdb.Exists(ctx, slotKey).Val())
+ require.Zero(t, rdbB.Exists(ctx, slotKey).Val())
})
}