This is an automated email from the ASF dual-hosted git repository. hulk pushed a commit to branch unstable in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push: new 3e1b21b7 fix(cluster): resolve forbidden slot range cleanup bug during the slot migration (#2829) 3e1b21b7 is described below commit 3e1b21b71dcb5d424c89204d05835a99e5d0fca3 Author: Rivers <riversjin1...@gmail.com> AuthorDate: Mon Mar 17 16:18:19 2025 +0800 fix(cluster): resolve forbidden slot range cleanup bug during the slot migration (#2829) --- src/cluster/cluster.cc | 8 +++++++- tests/gocase/integration/slotmigrate/slotmigrate_test.go | 14 ++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc index 822a4073..10dbf0e8 100644 --- a/src/cluster/cluster.cc +++ b/src/cluster/cluster.cc @@ -377,7 +377,13 @@ Status Cluster::ImportSlotRange(redis::Connection *conn, const SlotRange &slot_r LOG(ERROR) << fmt::format("[import] Failed to stop importing slot(s) {}: {}", slot_range.String(), s.Msg()); } }; // Stop forbidding writing slot to accept write commands - if (slot_range == srv_->slot_migrator->GetForbiddenSlotRange()) srv_->slot_migrator->ReleaseForbiddenSlotRange(); + if (slot_range.HasOverlap(srv_->slot_migrator->GetForbiddenSlotRange())) { + // This approach assumes a shard only handles one migration task at a time. + // When executing the import logic, the absence of other outgoing migrations on this shard justifies safely + // removing the forbidden slot. A more robust solution would be required if concurrent slot migrations are + // supported in the future. + srv_->slot_migrator->ReleaseForbiddenSlotRange(); + } LOG(INFO) << fmt::format("[import] Start importing slot(s) {}", slot_range.String()); break; case kImportSuccess: diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go b/tests/gocase/integration/slotmigrate/slotmigrate_test.go index e04acfc4..b57345de 100644 --- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go +++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go @@ -1334,6 +1334,19 @@ func TestSlotRangeMigrate(t *testing.T) { require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", "110-112", id1).Err(), errMsg) }) + t.Run("MIGRATE - Migrate back a proper subset range", func(t *testing.T) { + migrateSlotRangeAndSetSlot(t, ctx, rdb0, rdb1, id1, "3100-3400") + time.Sleep(1 * time.Second) + migrateSlotRangeAndSetSlot(t, ctx, rdb1, rdb0, id0, "3200-3300") + time.Sleep(1 * time.Second) + + key := "AAA" // CLUSTER KEYSLOT AAA is `3205`, which is in the range of `3200-3500` + require.Equal(t, int64(3205), rdb0.ClusterKeySlot(ctx, key).Val()) + + require.NoError(t, rdb0.Set(ctx, key, "value", 0).Err()) + require.Equal(t, "value", rdb0.Get(ctx, key).Val()) + }) + t.Run("MIGRATE - Failure cases", func(t *testing.T) { largeSlot := 210 for i := 0; i < 20000; i++ { @@ -1347,4 +1360,5 @@ func TestSlotRangeMigrate(t *testing.T) { // TODO: More precise migration failure slot range waitForMigrateSlotRangeState(t, rdb0, "200-220", SlotMigrationStateFailed) }) + }