This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 8542e923 Fix migrator and importer thread didn't start if
`persist-cluster-nodes-enabled` was disabled (#1674)
8542e923 is described below
commit 8542e9239665d4d495af12296def6e7c4f064763
Author: hulk <[email protected]>
AuthorDate: Mon Aug 14 22:37:41 2023 +0800
Fix migrator and importer thread didn't start if
`persist-cluster-nodes-enabled` was disabled (#1674)
---
src/server/server.cc | 12 +++---
.../integration/slotmigrate/slotmigrate_test.go | 45 ++++++++++++++++++++++
2 files changed, 52 insertions(+), 5 deletions(-)
diff --git a/src/server/server.cc b/src/server/server.cc
index 04e349b1..6ab3d018 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -143,15 +143,17 @@ Status Server::Start() {
}
}
- if (config_->cluster_enabled && config_->persist_cluster_nodes_enabled) {
- auto s = cluster->LoadClusterNodes(config_->NodesFilePath());
- if (!s.IsOK()) {
- return s.Prefixed("failed to load cluster nodes info");
+ if (config_->cluster_enabled) {
+ if (config_->persist_cluster_nodes_enabled) {
+ auto s = cluster->LoadClusterNodes(config_->NodesFilePath());
+ if (!s.IsOK()) {
+ return s.Prefixed("failed to load cluster nodes info");
+ }
}
// Create objects used for slot migration
slot_migrator =
std::make_unique<SlotMigrator>(this, config_->migrate_speed,
config_->pipeline_size, config_->sequence_gap);
- s = slot_migrator->CreateMigrationThread();
+ auto s = slot_migrator->CreateMigrationThread();
if (!s.IsOK()) {
return s.Prefixed("failed to create migration thread");
}
diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go
b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
index d21b9cef..11b32fc7 100644
--- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go
+++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
@@ -263,6 +263,51 @@ func TestSlotMigrateSourceServerFlushedOrKilled(t
*testing.T) {
})
}
+func TestSlotMigrateDisablePersistClusterNodes(t *testing.T) {
+ ctx := context.Background()
+
+ srv0 := util.StartServer(t, map[string]string{
+ "cluster-enabled": "yes",
+ "persist-cluster-nodes-enabled": "no",
+ })
+ defer func() { srv0.Close() }()
+ rdb0 := srv0.NewClient()
+ defer func() { require.NoError(t, rdb0.Close()) }()
+ id0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODEID", id0).Err())
+
+ srv1 := util.StartServer(t, map[string]string{
+ "cluster-enabled": "yes",
+ "persist-cluster-nodes-enabled": "no",
+ })
+ defer func() { srv1.Close() }()
+ rdb1 := srv1.NewClient()
+ defer func() { require.NoError(t, rdb1.Close()) }()
+ id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())
+
+ clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", id0,
srv0.Host(), srv0.Port())
+ clusterNodes += fmt.Sprintf("%s %s %d master -", id1, srv1.Host(),
srv1.Port())
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes,
"1").Err())
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes,
"1").Err())
+
+ slot := 1
+ require.NoError(t, rdb0.Del(ctx, util.SlotTable[slot]).Err())
+ require.ErrorContains(t, rdb1.Set(ctx, util.SlotTable[slot], "foobar",
0).Err(), "MOVED")
+
+ cnt := 100
+ for i := 0; i < cnt; i++ {
+ require.NoError(t, rdb0.LPush(ctx, util.SlotTable[slot],
i).Err())
+ }
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot,
id1).Val())
+ waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess)
+ require.EqualValues(t, cnt, rdb1.LLen(ctx, util.SlotTable[slot]).Val())
+
+ k := fmt.Sprintf("{%s}_1", util.SlotTable[slot])
+ require.ErrorContains(t, rdb0.Set(ctx, k, "slot1_value", 0).Err(),
"MOVED")
+ require.Equal(t, "OK", rdb1.Set(ctx, k, "slot1_value", 0).Val())
+}
+
func TestSlotMigrateNewNodeAndAuth(t *testing.T) {
ctx := context.Background()