This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new bb665f6f Fix should forbid importing the slot which belongs to itself 
in cluster mode (#2209)
bb665f6f is described below

commit bb665f6fc2de7cb1215240866d48e92adfed43b1
Author: hulk <[email protected]>
AuthorDate: Sat Mar 30 22:58:10 2024 +0800

    Fix should forbid importing the slot which belongs to itself in cluster 
mode (#2209)
---
 src/cluster/cluster.cc                             |  4 +
 src/cluster/slot_import.cc                         |  4 +
 .../integration/slotimport/slotimport_test.go      | 96 ++++++++++++++--------
 3 files changed, 68 insertions(+), 36 deletions(-)

diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index bb6c30de..686b63a0 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -318,6 +318,10 @@ Status Cluster::ImportSlot(redis::Connection *conn, int 
slot, int state) {
   if (!IsValidSlot(slot)) {
     return {Status::NotOK, errSlotOutOfRange};
   }
+  auto source_node = srv_->cluster->slots_nodes_[slot];
+  if (source_node && source_node->id == myid_) {
+    return {Status::NotOK, "Can't import slot which belongs to me"};
+  }
 
   Status s;
   switch (state) {
diff --git a/src/cluster/slot_import.cc b/src/cluster/slot_import.cc
index a3988e76..dcc97d66 100644
--- a/src/cluster/slot_import.cc
+++ b/src/cluster/slot_import.cc
@@ -31,6 +31,10 @@ SlotImport::SlotImport(Server *srv)
 Status SlotImport::Start(int slot) {
   std::lock_guard<std::mutex> guard(mutex_);
   if (import_status_ == kImportStart) {
+    // return ok if the same slot is importing
+    if (import_slot_ == slot) {
+      return Status::OK();
+    }
     return {Status::NotOK, fmt::format("only one importing slot is allowed, 
current slot is: {}", import_slot_)};
   }
 
diff --git a/tests/gocase/integration/slotimport/slotimport_test.go 
b/tests/gocase/integration/slotimport/slotimport_test.go
index 9a1830fa..1d427b80 100644
--- a/tests/gocase/integration/slotimport/slotimport_test.go
+++ b/tests/gocase/integration/slotimport/slotimport_test.go
@@ -22,6 +22,7 @@ package slotimport
 import (
        "context"
        "fmt"
+       "strings"
        "testing"
        "time"
 
@@ -61,23 +62,39 @@ func TestImportSlaveServer(t *testing.T) {
 func TestImportedServer(t *testing.T) {
        ctx := context.Background()
 
-       srv := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
-       defer func() { srv.Close() }()
-       rdb := srv.NewClient()
-       defer func() { require.NoError(t, rdb.Close()) }()
-       srvID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
-       clusterNodes := fmt.Sprintf("%s 127.0.0.1 %d master - 0-16383", srvID, 
srv.Port())
-       require.NoError(t, rdb.Do(ctx, "clusterx", "SETNODEID", srvID).Err())
-       require.NoError(t, rdb.Do(ctx, "clusterx", "SETNODES", clusterNodes, 
"1").Err())
+       srvA := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+       defer func() { srvA.Close() }()
+       rdbA := srvA.NewClient()
+       defer func() { require.NoError(t, rdbA.Close()) }()
+       srvAID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, rdbA.Do(ctx, "clusterx", "SETNODEID", srvAID).Err())
+
+       srvB := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+       defer func() { srvB.Close() }()
+       rdbB := srvB.NewClient()
+       defer func() { require.NoError(t, rdbB.Close()) }()
+       srvBID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, rdbB.Do(ctx, "clusterx", "SETNODEID", srvBID).Err())
+
+       clusterNodes := fmt.Sprintf("%s 127.0.0.1 %d master - 0-8191", srvAID, 
srvA.Port())
+       clusterNodes = fmt.Sprintf("%s\n%s 127.0.0.1 %d master - 8192-16383", 
clusterNodes, srvBID, srvB.Port())
+
+       require.NoError(t, rdbA.Do(ctx, "clusterx", "SETNODES", clusterNodes, 
"1").Err())
+       require.NoError(t, rdbB.Do(ctx, "clusterx", "SETNODES", clusterNodes, 
"1").Err())
 
        t.Run("IMPORT - error slot", func(t *testing.T) {
-               require.ErrorContains(t, rdb.Do(ctx, "cluster", "import", -1, 
0).Err(), "Slot is out of range")
-               require.ErrorContains(t, rdb.Do(ctx, "cluster", "import", 
16384, 0).Err(), "Slot is out of range")
+               require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import", -1, 
0).Err(), "Slot is out of range")
+               require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import", 
16384, 0).Err(), "Slot is out of range")
        })
 
        t.Run("IMPORT - slot with error state", func(t *testing.T) {
-               require.ErrorContains(t, rdb.Do(ctx, "cluster", "import", 1, 
4).Err(), "Invalid import state")
-               require.ErrorContains(t, rdb.Do(ctx, "cluster", "import", 1, 
-3).Err(), "Invalid import state")
+               require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import", 1, 
4).Err(), "Invalid import state")
+               require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import", 1, 
-3).Err(), "Invalid import state")
+       })
+
+       t.Run("IMPORT - slot with wrong state", func(t *testing.T) {
+               require.Contains(t, rdbA.Do(ctx, "cluster", "import", 1, 
0).Err(),
+                       "Can't import slot which belongs to me")
        })
 
        t.Run("IMPORT - slot states in right order", func(t *testing.T) {
@@ -85,66 +102,73 @@ func TestImportedServer(t *testing.T) {
                slotKey := util.SlotTable[slotNum]
 
                // import start
-               require.Equal(t, "OK", rdb.Do(ctx, "cluster", "import", 
slotNum, 0).Val())
-               require.NoError(t, rdb.Set(ctx, slotKey, "slot1", 0).Err())
-               require.Equal(t, "slot1", rdb.Get(ctx, slotKey).Val())
-               clusterInfo := rdb.ClusterInfo(ctx).Val()
+               require.NoError(t, rdbA.Set(ctx, slotKey, "slot1", 0).Err())
+               require.Equal(t, "slot1", rdbA.Get(ctx, slotKey).Val())
+               require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import", 
slotNum, 0).Val())
+               clusterInfo := rdbB.ClusterInfo(ctx).Val()
                require.Contains(t, clusterInfo, "importing_slot: 1")
                require.Contains(t, clusterInfo, "import_state: start")
+               clusterNodes := rdbB.ClusterNodes(ctx).Val()
+               require.Contains(t, clusterNodes, fmt.Sprintf("[%d-<-%s]", 
slotNum, srvAID))
+
+               require.NoError(t, rdbA.Do(ctx, "clusterx", "migrate", slotNum, 
srvBID).Err())
+               require.Eventually(t, func() bool {
+                       clusterInfo := 
rdbA.ClusterInfo(context.Background()).Val()
+                       return strings.Contains(clusterInfo, 
fmt.Sprintf("migrating_slot: %d", slotNum)) &&
+                               strings.Contains(clusterInfo, 
fmt.Sprintf("migrating_state: %s", "success"))
+               }, 5*time.Second, 100*time.Millisecond)
 
-               clusterNodes := rdb.ClusterNodes(ctx).Val()
-               require.Contains(t, clusterNodes, fmt.Sprintf("[%d-<-%s]", 
slotNum, srvID))
                // import success
-               require.Equal(t, "OK", rdb.Do(ctx, "cluster", "import", 
slotNum, 1).Val())
-               clusterInfo = rdb.ClusterInfo(ctx).Val()
+               require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import", 
slotNum, 1).Val())
+               clusterInfo = rdbB.ClusterInfo(ctx).Val()
                require.Contains(t, clusterInfo, "importing_slot: 1")
                require.Contains(t, clusterInfo, "import_state: success")
 
                // import finish and should not contain the import section
-               clusterNodes = rdb.ClusterNodes(ctx).Val()
-               require.NotContains(t, clusterNodes, fmt.Sprintf("[%d-<-%s]", 
slotNum, srvID))
+               clusterNodes = rdbB.ClusterNodes(ctx).Val()
+               require.NotContains(t, clusterNodes, fmt.Sprintf("[%d-<-%s]", 
slotNum, srvAID))
 
                time.Sleep(50 * time.Millisecond)
-               require.Equal(t, "slot1", rdb.Get(ctx, slotKey).Val())
+               require.Equal(t, "slot1", rdbB.Get(ctx, slotKey).Val())
        })
 
        t.Run("IMPORT - slot state 'error'", func(t *testing.T) {
                slotNum := 10
                slotKey := util.SlotTable[slotNum]
 
-               require.Equal(t, "OK", rdb.Do(ctx, "cluster", "import", 
slotNum, 0).Val())
-               require.NoError(t, rdb.Set(ctx, slotKey, "slot10_again", 
0).Err())
-               require.Equal(t, "slot10_again", rdb.Get(ctx, slotKey).Val())
+               require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import", 
slotNum, 0).Val())
+               require.NoError(t, rdbB.Set(ctx, slotKey, "slot10_again", 
0).Err())
+               require.Equal(t, "slot10_again", rdbB.Get(ctx, slotKey).Val())
 
                // import error
-               require.Equal(t, "OK", rdb.Do(ctx, "cluster", "import", 
slotNum, 2).Val())
+               require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import", 
slotNum, 2).Val())
                time.Sleep(50 * time.Millisecond)
 
-               clusterInfo := rdb.ClusterInfo(ctx).Val()
+               clusterInfo := rdbB.ClusterInfo(ctx).Val()
                require.Contains(t, clusterInfo, "importing_slot: 10")
                require.Contains(t, clusterInfo, "import_state: error")
 
                // get empty
-               require.Zero(t, rdb.Exists(ctx, slotKey).Val())
+               require.Zero(t, rdbB.Exists(ctx, slotKey).Val())
        })
 
        t.Run("IMPORT - connection broken", func(t *testing.T) {
                slotNum := 11
                slotKey := util.SlotTable[slotNum]
-               require.Equal(t, "OK", rdb.Do(ctx, "cluster", "import", 
slotNum, 0).Val())
-               require.NoError(t, rdb.Set(ctx, slotKey, "slot11", 0).Err())
-               require.Equal(t, "slot11", rdb.Get(ctx, slotKey).Val())
+               require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import", 
slotNum, 0).Val())
+               require.NoError(t, rdbB.Set(ctx, slotKey, "slot11", 0).Err())
+               require.Equal(t, "slot11", rdbB.Get(ctx, slotKey).Val())
 
                // close connection, server will stop importing
-               require.NoError(t, rdb.Close())
-               rdb = srv.NewClient()
+               require.NoError(t, rdbB.Close())
+               rdbB = srvB.NewClient()
                time.Sleep(50 * time.Millisecond)
 
-               clusterInfo := rdb.ClusterInfo(ctx).Val()
+               clusterInfo := rdbB.ClusterInfo(ctx).Val()
                require.Contains(t, clusterInfo, "importing_slot: 11")
                require.Contains(t, clusterInfo, "import_state: error")
 
                // get empty
-               require.Zero(t, rdb.Exists(ctx, slotKey).Val())
+               require.Zero(t, rdbB.Exists(ctx, slotKey).Val())
        })
 }

Reply via email to