This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 8542e923 Fix migrator and importer thread didn't start if 
`persist-cluster-nodes-enabled` was disabled (#1674)
8542e923 is described below

commit 8542e9239665d4d495af12296def6e7c4f064763
Author: hulk <[email protected]>
AuthorDate: Mon Aug 14 22:37:41 2023 +0800

    Fix migrator and importer thread didn't start if 
`persist-cluster-nodes-enabled` was disabled (#1674)
---
 src/server/server.cc                               | 12 +++---
 .../integration/slotmigrate/slotmigrate_test.go    | 45 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 5 deletions(-)

diff --git a/src/server/server.cc b/src/server/server.cc
index 04e349b1..6ab3d018 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -143,15 +143,17 @@ Status Server::Start() {
     }
   }
 
-  if (config_->cluster_enabled && config_->persist_cluster_nodes_enabled) {
-    auto s = cluster->LoadClusterNodes(config_->NodesFilePath());
-    if (!s.IsOK()) {
-      return s.Prefixed("failed to load cluster nodes info");
+  if (config_->cluster_enabled) {
+    if (config_->persist_cluster_nodes_enabled) {
+      auto s = cluster->LoadClusterNodes(config_->NodesFilePath());
+      if (!s.IsOK()) {
+        return s.Prefixed("failed to load cluster nodes info");
+      }
     }
     // Create objects used for slot migration
     slot_migrator =
         std::make_unique<SlotMigrator>(this, config_->migrate_speed, 
config_->pipeline_size, config_->sequence_gap);
-    s = slot_migrator->CreateMigrationThread();
+    auto s = slot_migrator->CreateMigrationThread();
     if (!s.IsOK()) {
       return s.Prefixed("failed to create migration thread");
     }
diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go 
b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
index d21b9cef..11b32fc7 100644
--- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go
+++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
@@ -263,6 +263,51 @@ func TestSlotMigrateSourceServerFlushedOrKilled(t 
*testing.T) {
        })
 }
 
+func TestSlotMigrateDisablePersistClusterNodes(t *testing.T) {
+       ctx := context.Background()
+
+       srv0 := util.StartServer(t, map[string]string{
+               "cluster-enabled":               "yes",
+               "persist-cluster-nodes-enabled": "no",
+       })
+       defer func() { srv0.Close() }()
+       rdb0 := srv0.NewClient()
+       defer func() { require.NoError(t, rdb0.Close()) }()
+       id0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODEID", id0).Err())
+
+       srv1 := util.StartServer(t, map[string]string{
+               "cluster-enabled":               "yes",
+               "persist-cluster-nodes-enabled": "no",
+       })
+       defer func() { srv1.Close() }()
+       rdb1 := srv1.NewClient()
+       defer func() { require.NoError(t, rdb1.Close()) }()
+       id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())
+
+       clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", id0, 
srv0.Host(), srv0.Port())
+       clusterNodes += fmt.Sprintf("%s %s %d master -", id1, srv1.Host(), 
srv1.Port())
+       require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, 
"1").Err())
+       require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, 
"1").Err())
+
+       slot := 1
+       require.NoError(t, rdb0.Del(ctx, util.SlotTable[slot]).Err())
+       require.ErrorContains(t, rdb1.Set(ctx, util.SlotTable[slot], "foobar", 
0).Err(), "MOVED")
+
+       cnt := 100
+       for i := 0; i < cnt; i++ {
+               require.NoError(t, rdb0.LPush(ctx, util.SlotTable[slot], 
i).Err())
+       }
+       require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, 
id1).Val())
+       waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess)
+       require.EqualValues(t, cnt, rdb1.LLen(ctx, util.SlotTable[slot]).Val())
+
+       k := fmt.Sprintf("{%s}_1", util.SlotTable[slot])
+       require.ErrorContains(t, rdb0.Set(ctx, k, "slot1_value", 0).Err(), 
"MOVED")
+       require.Equal(t, "OK", rdb1.Set(ctx, k, "slot1_value", 0).Val())
+}
+
 func TestSlotMigrateNewNodeAndAuth(t *testing.T) {
        ctx := context.Background()
 

Reply via email to