This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new beb1979f feat(cluster): support migrate slot range (#2389)
beb1979f is described below

commit beb1979f14c5af2ca3b4f7a2f42bd0ce857ab85a
Author: Zhou SiLe <[email protected]>
AuthorDate: Sun Jul 14 21:19:21 2024 +0800

    feat(cluster): support migrate slot range (#2389)
---
 src/cluster/cluster.cc                             | 105 +++++++-----
 src/cluster/cluster.h                              |  11 +-
 src/cluster/cluster_defs.h                         |  35 +++-
 src/cluster/slot_import.cc                         |  49 +++---
 src/cluster/slot_import.h                          |  11 +-
 src/cluster/slot_migrate.cc                        | 150 +++++++++--------
 src/cluster/slot_migrate.h                         |  28 ++--
 src/commands/cmd_cluster.cc                        |  20 ++-
 src/storage/batch_extractor.cc                     |  12 +-
 src/storage/batch_extractor.h                      |   9 +-
 src/storage/iterator.cc                            |   8 +-
 src/storage/iterator.h                             |  12 +-
 src/storage/redis_db.cc                            |  12 +-
 src/storage/redis_db.h                             |   3 +-
 tests/gocase/integration/cluster/cluster_test.go   |   4 +-
 .../integration/slotimport/slotimport_test.go      |  14 +-
 .../integration/slotmigrate/slotmigrate_test.go    | 178 +++++++++++++++++++--
 tests/gocase/unit/geo/geo_test.go                  |  27 ++--
 tests/gocase/unit/type/list/list_test.go           |   3 -
 tests/gocase/unit/type/zset/zset_test.go           |   2 +-
 tests/gocase/util/random.go                        |  40 +++++
 21 files changed, 512 insertions(+), 221 deletions(-)

diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index 8d373e0d..a38c0868 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -115,7 +115,7 @@ Status Cluster::SetSlotRanges(const std::vector<SlotRange> 
&slot_ranges, const s
   //  2. Add the slot into to-assign node
   //  3. Update the map of slots to nodes.
   // remember: The atomicity of the process is based on
-  // the transactionality of ClearKeysOfSlot().
+  // the transactionality of ClearKeysOfSlotRange().
   for (auto [s_start, s_end] : slot_ranges) {
     for (int slot = s_start; slot <= s_end; slot++) {
       std::shared_ptr<ClusterNode> old_node = slots_nodes_[slot];
@@ -129,7 +129,7 @@ Status Cluster::SetSlotRanges(const std::vector<SlotRange> 
&slot_ranges, const s
       if (old_node == myself_ && old_node != to_assign_node) {
         // If slot is migrated from this node
         if (migrated_slots_.count(slot) > 0) {
-          auto s = srv_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace, 
slot);
+          auto s = 
srv_->slot_migrator->ClearKeysOfSlotRange(kDefaultNamespace, 
SlotRange::GetPoint(slot));
           if (!s.ok()) {
             LOG(ERROR) << "failed to clear data of migrated slot: " << 
s.ToString();
           }
@@ -214,7 +214,7 @@ Status Cluster::SetClusterNodes(const std::string 
&nodes_str, int64_t version, b
   if (!migrated_slots_.empty()) {
     for (const auto &[slot, _] : migrated_slots_) {
       if (slots_nodes_[slot] != myself_) {
-        auto s = srv_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace, slot);
+        auto s = srv_->slot_migrator->ClearKeysOfSlotRange(kDefaultNamespace, 
SlotRange::GetPoint(slot));
         if (!s.ok()) {
           LOG(ERROR) << "failed to clear data of migrated slots: " << 
s.ToString();
         }
@@ -258,41 +258,53 @@ Status Cluster::SetMasterSlaveRepl() {
 
 bool Cluster::IsNotMaster() { return myself_ == nullptr || myself_->role != 
kClusterMaster || srv_->IsSlave(); }
 
-Status Cluster::SetSlotMigrated(int slot, const std::string &ip_port) {
-  if (!IsValidSlot(slot)) {
-    return {Status::NotOK, errSlotOutOfRange};
+Status Cluster::SetSlotRangeMigrated(const SlotRange &slot_range, const 
std::string &ip_port) {
+  if (!slot_range.IsValid()) {
+    return {Status::NotOK, errSlotRangeInvalid};
   }
 
   // It is called by slot-migrating thread which is an asynchronous thread.
   // Therefore, it should be locked when a record is added to 'migrated_slots_'
   // which will be accessed when executing commands.
   auto exclusivity = srv_->WorkExclusivityGuard();
-  migrated_slots_[slot] = ip_port;
+  for (auto slot = slot_range.start; slot <= slot_range.end; slot++) {
+    migrated_slots_[slot] = ip_port;
+  }
   return Status::OK();
 }
 
-Status Cluster::SetSlotImported(int slot) {
-  if (!IsValidSlot(slot)) {
-    return {Status::NotOK, errSlotOutOfRange};
+Status Cluster::SetSlotRangeImported(const SlotRange &slot_range) {
+  if (!slot_range.IsValid()) {
+    return {Status::NotOK, errSlotRangeInvalid};
   }
 
   // It is called by command 'cluster import'. When executing the command, the
   // exclusive lock has been locked. Therefore, it can't be locked again.
-  imported_slots_.insert(slot);
+  for (auto slot = slot_range.start; slot <= slot_range.end; slot++) {
+    imported_slots_.insert(slot);
+  }
   return Status::OK();
 }
 
-Status Cluster::MigrateSlot(int slot, const std::string &dst_node_id, 
SyncMigrateContext *blocking_ctx) {
+Status Cluster::MigrateSlotRange(const SlotRange &slot_range, const 
std::string &dst_node_id,
+                                 SyncMigrateContext *blocking_ctx) {
   if (nodes_.find(dst_node_id) == nodes_.end()) {
     return {Status::NotOK, "Can't find the destination node id"};
   }
 
-  if (!IsValidSlot(slot)) {
-    return {Status::NotOK, errSlotOutOfRange};
+  if (!slot_range.IsValid()) {
+    return {Status::NotOK, errSlotRangeInvalid};
+  }
+
+  if (!migrated_slots_.empty() &&
+      slot_range.HasOverlap({migrated_slots_.begin()->first, 
migrated_slots_.rbegin()->first})) {
+    return {Status::NotOK, "Can't migrate slot which has been migrated"};
   }
 
-  if (slots_nodes_[slot] != myself_) {
-    return {Status::NotOK, "Can't migrate slot which doesn't belong to me"};
+  for (auto slot = slot_range.start; slot <= slot_range.end; slot++) {
+    if (slots_nodes_[slot] != myself_) {
+      return {Status::NotOK, "Can't migrate slot which doesn't belong to me"};
+    }
   }
 
   if (IsNotMaster()) {
@@ -308,51 +320,55 @@ Status Cluster::MigrateSlot(int slot, const std::string 
&dst_node_id, SyncMigrat
   }
 
   const auto &dst = nodes_[dst_node_id];
-  Status s = srv_->slot_migrator->PerformSlotMigration(dst_node_id, dst->host, 
dst->port, slot, blocking_ctx);
+  Status s =
+      srv_->slot_migrator->PerformSlotRangeMigration(dst_node_id, dst->host, 
dst->port, slot_range, blocking_ctx);
   return s;
 }
 
-Status Cluster::ImportSlot(redis::Connection *conn, int slot, int state) {
+Status Cluster::ImportSlotRange(redis::Connection *conn, const SlotRange 
&slot_range, int state) {
   if (IsNotMaster()) {
     return {Status::NotOK, "Slave can't import slot"};
   }
 
-  if (!IsValidSlot(slot)) {
-    return {Status::NotOK, errSlotOutOfRange};
+  if (!slot_range.IsValid()) {
+    return {Status::NotOK, errSlotRangeInvalid};
   }
-  auto source_node = srv_->cluster->slots_nodes_[slot];
-  if (source_node && source_node->id == myid_) {
-    return {Status::NotOK, "Can't import slot which belongs to me"};
+
+  for (auto slot = slot_range.start; slot <= slot_range.end; slot++) {
+    auto source_node = srv_->cluster->slots_nodes_[slot];
+    if (source_node && source_node->id == myid_) {
+      return {Status::NotOK, "Can't import slot which belongs to me"};
+    }
   }
 
   Status s;
   switch (state) {
     case kImportStart:
-      s = srv_->slot_import->Start(slot);
+      s = srv_->slot_import->Start(slot_range);
       if (!s.IsOK()) return s;
 
       // Set link importing
       conn->SetImporting();
-      myself_->importing_slot = slot;
+      myself_->importing_slot_range = slot_range;
       // Set link error callback
-      conn->close_cb = [object_ptr = srv_->slot_import.get(), slot](int fd) {
+      conn->close_cb = [object_ptr = srv_->slot_import.get(), slot_range](int 
fd) {
         auto s = object_ptr->StopForLinkError();
         if (!s.IsOK()) {
-          LOG(ERROR) << fmt::format("[import] Failed to stop importing slot 
{}: {}", slot, s.Msg());
+          LOG(ERROR) << fmt::format("[import] Failed to stop importing slot(s) 
{}: {}", slot_range.String(), s.Msg());
         }
       };  // Stop forbidding writing slot to accept write commands
-      if (slot == srv_->slot_migrator->GetForbiddenSlot()) 
srv_->slot_migrator->ReleaseForbiddenSlot();
-      LOG(INFO) << fmt::format("[import] Start importing slot {}", slot);
+      if (slot_range == srv_->slot_migrator->GetForbiddenSlotRange()) 
srv_->slot_migrator->ReleaseForbiddenSlotRange();
+      LOG(INFO) << fmt::format("[import] Start importing slot(s) {}", 
slot_range.String());
       break;
     case kImportSuccess:
-      s = srv_->slot_import->Success(slot);
+      s = srv_->slot_import->Success(slot_range);
       if (!s.IsOK()) return s;
-      LOG(INFO) << fmt::format("[import] Mark the importing slot {} as 
succeed", slot);
+      LOG(INFO) << fmt::format("[import] Mark the importing slot(s) {} as 
succeed", slot_range.String());
       break;
     case kImportFailed:
-      s = srv_->slot_import->Fail(slot);
+      s = srv_->slot_import->Fail(slot_range);
       if (!s.IsOK()) return s;
-      LOG(INFO) << fmt::format("[import] Mark the importing slot {} as 
failed", slot);
+      LOG(INFO) << fmt::format("[import] Mark the importing slot(s) {} as 
failed", slot_range.String());
       break;
     default:
       return {Status::NotOK, errInvalidImportState};
@@ -550,15 +566,16 @@ std::string Cluster::genNodesDescription() {
     // Just for MYSELF node to show the importing/migrating slot
     if (node->id == myid_) {
       if (srv_->slot_migrator) {
-        auto migrating_slot = srv_->slot_migrator->GetMigratingSlot();
-        if (migrating_slot != -1) {
-          node_str.append(fmt::format(" [{}->-{}]", migrating_slot, 
srv_->slot_migrator->GetDstNode()));
+        auto migrating_slot_range = 
srv_->slot_migrator->GetMigratingSlotRange();
+        if (migrating_slot_range.IsValid()) {
+          node_str.append(fmt::format(" [{}->-{}]", 
migrating_slot_range.String(), srv_->slot_migrator->GetDstNode()));
         }
       }
       if (srv_->slot_import) {
-        auto importing_slot = srv_->slot_import->GetSlot();
-        if (importing_slot != -1) {
-          node_str.append(fmt::format(" [{}-<-{}]", importing_slot, 
getNodeIDBySlot(importing_slot)));
+        auto importing_slot_range = srv_->slot_import->GetSlotRange();
+        if (importing_slot_range.IsValid()) {
+          node_str.append(
+              fmt::format(" [{}-<-{}]", importing_slot_range.String(), 
getNodeIDBySlot(importing_slot_range.start)));
         }
       }
     }
@@ -802,7 +819,9 @@ Status Cluster::parseClusterNodes(const std::string 
&nodes_str, ClusterNodes *no
   return Status::OK();
 }
 
-bool Cluster::IsWriteForbiddenSlot(int slot) const { return 
srv_->slot_migrator->GetForbiddenSlot() == slot; }
+bool Cluster::IsWriteForbiddenSlot(int slot) const {
+  return srv_->slot_migrator->GetForbiddenSlotRange().Contains(slot);
+}
 
 Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, 
const std::vector<std::string> &cmd_tokens,
                                 redis::Connection *conn) {
@@ -846,7 +865,7 @@ Status Cluster::CanExecByMySelf(const 
redis::CommandAttributes *attributes, cons
     return Status::OK();  // I'm serving this slot
   }
 
-  if (myself_ && myself_->importing_slot == slot &&
+  if (myself_ && myself_->importing_slot_range.Contains(slot) &&
       (conn->IsImporting() || 
conn->IsFlagEnabled(redis::Connection::kAsking))) {
     // While data migrating, the topology of the destination node has not been 
changed.
     // The destination node has to serve the requests from the migrating slot,
@@ -874,10 +893,10 @@ Status Cluster::CanExecByMySelf(const 
redis::CommandAttributes *attributes, cons
 // Only HARD mode is meaningful to the Kvrocks cluster,
 // so it will force clearing all information after resetting.
 Status Cluster::Reset() {
-  if (srv_->slot_migrator && srv_->slot_migrator->GetMigratingSlot() != -1) {
+  if (srv_->slot_migrator && 
srv_->slot_migrator->GetMigratingSlotRange().IsValid()) {
     return {Status::NotOK, "Can't reset cluster while migrating slot"};
   }
-  if (srv_->slot_import && srv_->slot_import->GetSlot() != -1) {
+  if (srv_->slot_import && srv_->slot_import->GetSlotRange().IsValid()) {
     return {Status::NotOK, "Can't reset cluster while importing slot"};
   }
   if (!srv_->storage->IsEmptyDB()) {
diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h
index 335d5ef1..e595666c 100644
--- a/src/cluster/cluster.h
+++ b/src/cluster/cluster.h
@@ -47,7 +47,7 @@ class ClusterNode {
   std::string master_id;
   std::bitset<kClusterSlots> slots;
   std::vector<std::string> replicas;
-  int importing_slot = -1;
+  SlotRange importing_slot_range = {-1, -1};
 };
 
 struct SlotInfo {
@@ -74,8 +74,8 @@ class Cluster {
   StatusOr<std::string> GetReplicas(const std::string &node_id);
   Status SetNodeId(const std::string &node_id);
   Status SetSlotRanges(const std::vector<SlotRange> &slot_ranges, const 
std::string &node_id, int64_t version);
-  Status SetSlotMigrated(int slot, const std::string &ip_port);
-  Status SetSlotImported(int slot);
+  Status SetSlotRangeMigrated(const SlotRange &slot_range, const std::string 
&ip_port);
+  Status SetSlotRangeImported(const SlotRange &slot_range);
   Status GetSlotsInfo(std::vector<SlotInfo> *slot_infos);
   Status GetClusterInfo(std::string *cluster_infos);
   int64_t GetVersion() const { return version_; }
@@ -85,8 +85,9 @@ class Cluster {
   Status CanExecByMySelf(const redis::CommandAttributes *attributes, const 
std::vector<std::string> &cmd_tokens,
                          redis::Connection *conn);
   Status SetMasterSlaveRepl();
-  Status MigrateSlot(int slot, const std::string &dst_node_id, 
SyncMigrateContext *blocking_ctx = nullptr);
-  Status ImportSlot(redis::Connection *conn, int slot, int state);
+  Status MigrateSlotRange(const SlotRange &slot_range, const std::string 
&dst_node_id,
+                          SyncMigrateContext *blocking_ctx = nullptr);
+  Status ImportSlotRange(redis::Connection *conn, const SlotRange &slot_range, 
int state);
   std::string GetMyId() const { return myid_; }
   Status DumpClusterNodes(const std::string &file);
   Status LoadClusterNodes(const std::string &file_path);
diff --git a/src/cluster/cluster_defs.h b/src/cluster/cluster_defs.h
index 0178ac75..12fa568d 100644
--- a/src/cluster/cluster_defs.h
+++ b/src/cluster/cluster_defs.h
@@ -21,6 +21,7 @@
 #pragma once
 
 #include "cluster/redis_slot.h"
+#include "fmt/format.h"
 
 enum {
   kClusterMaster = 1,
@@ -33,6 +34,7 @@ enum {
 inline constexpr const char *errInvalidNodeID = "Invalid cluster node id";
 inline constexpr const char *errInvalidSlotID = "Invalid slot id";
 inline constexpr const char *errSlotOutOfRange = "Slot is out of range";
+inline constexpr const char *errSlotRangeInvalid = "Slot range is invalid";
 inline constexpr const char *errInvalidClusterVersion = "Invalid cluster 
version";
 inline constexpr const char *errSlotOverlapped = "Slot distribution is 
overlapped";
 inline constexpr const char *errNoMasterNode = "The node isn't a master";
@@ -40,4 +42,35 @@ inline constexpr const char *errClusterNoInitialized = "The 
cluster is not initi
 inline constexpr const char *errInvalidClusterNodeInfo = "Invalid cluster 
nodes info";
 inline constexpr const char *errInvalidImportState = "Invalid import state";
 
-using SlotRange = std::pair<int, int>;
+/// SlotRange is a range of cluster slots covering [start, end],
+/// where the valid values for start and end are [0, kClusterSlots).
+/// When both start and end are -1, it usually indicates an empty or wrong 
SlotRange.
+struct SlotRange {
+  SlotRange(int start, int end) : start(start), end(end) {}
+  SlotRange() : start(-1), end(-1) {}
+  bool IsValid() const { return start >= 0 && end >= 0 && start <= end && end 
< kClusterSlots; }
+
+  /// Contains is used to determine whether a slot is within the SlotRange.
+  /// Note that if the SlotRange is invalid, it will always return False.
+  bool Contains(int slot) const { return IsValid() && slot >= start && slot <= 
end; }
+
+  /// HasOverlap is used to determine whether two SlotRanges overlap.
+  /// Note that if either SlotRange is invalid, it will always return False.
+  bool HasOverlap(const SlotRange &rhs) const {
+    return IsValid() && rhs.IsValid() && end >= rhs.start && rhs.end >= start;
+  }
+
+  std::string String() const {
+    if (!IsValid()) return "-1";
+    if (start == end) return fmt::format("{}", start);
+    return fmt::format("{}-{}", start, end);
+  }
+
+  static SlotRange GetPoint(int slot) { return {slot, slot}; }
+
+  bool operator==(const SlotRange &rhs) const { return start == rhs.start && 
end == rhs.end; }
+  bool operator!=(const SlotRange &rhs) const { return !(*this == rhs); }
+
+  int start;
+  int end;
+};
diff --git a/src/cluster/slot_import.cc b/src/cluster/slot_import.cc
index 4306e336..ef04aba7 100644
--- a/src/cluster/slot_import.cc
+++ b/src/cluster/slot_import.cc
@@ -21,59 +21,62 @@
 #include "slot_import.h"
 
 SlotImport::SlotImport(Server *srv)
-    : Database(srv->storage, kDefaultNamespace), srv_(srv), import_slot_(-1), 
import_status_(kImportNone) {
+    : Database(srv->storage, kDefaultNamespace), srv_(srv), 
import_slot_range_(-1, -1), import_status_(kImportNone) {
   std::lock_guard<std::mutex> guard(mutex_);
   // Let metadata_cf_handle_ be nullptr, then get them in real time while use 
them.
   // See comments in SlotMigrator::SlotMigrator for detailed reason.
   metadata_cf_handle_ = nullptr;
 }
 
-Status SlotImport::Start(int slot) {
+Status SlotImport::Start(const SlotRange &slot_range) {
   std::lock_guard<std::mutex> guard(mutex_);
   if (import_status_ == kImportStart) {
     // return ok if the same slot is importing
-    if (import_slot_ == slot) {
+    if (import_slot_range_ == slot_range) {
       return Status::OK();
     }
-    return {Status::NotOK, fmt::format("only one importing slot is allowed, 
current slot is: {}", import_slot_)};
+    return {Status::NotOK,
+            fmt::format("only one importing job is allowed, current importing: 
{}", import_slot_range_.String())};
   }
 
   // Clean slot data first
-  auto s = ClearKeysOfSlot(namespace_, slot);
+  auto s = ClearKeysOfSlotRange(namespace_, slot_range);
   if (!s.ok()) {
-    return {Status::NotOK, fmt::format("clear keys of slot error: {}", 
s.ToString())};
+    return {Status::NotOK, fmt::format("clear keys of slot(s) error: {}", 
s.ToString())};
   }
 
   import_status_ = kImportStart;
-  import_slot_ = slot;
+  import_slot_range_ = slot_range;
   return Status::OK();
 }
 
-Status SlotImport::Success(int slot) {
+Status SlotImport::Success(const SlotRange &slot_range) {
   std::lock_guard<std::mutex> guard(mutex_);
-  if (import_slot_ != slot) {
-    return {Status::NotOK, fmt::format("mismatch slot, importing slot: {}, but 
got: {}", import_slot_, slot)};
+  if (import_slot_range_ != slot_range) {
+    return {Status::NotOK, fmt::format("mismatch slot, importing slot(s): {}, 
but got: {}", import_slot_range_.String(),
+                                       slot_range.String())};
   }
 
-  Status s = srv_->cluster->SetSlotImported(import_slot_);
+  Status s = srv_->cluster->SetSlotRangeImported(import_slot_range_);
   if (!s.IsOK()) {
-    return {Status::NotOK, fmt::format("unable to set imported status: {}", 
slot)};
+    return {Status::NotOK, fmt::format("unable to set imported status: {}", 
slot_range.String())};
   }
 
   import_status_ = kImportSuccess;
   return Status::OK();
 }
 
-Status SlotImport::Fail(int slot) {
+Status SlotImport::Fail(const SlotRange &slot_range) {
   std::lock_guard<std::mutex> guard(mutex_);
-  if (import_slot_ != slot) {
-    return {Status::NotOK, fmt::format("mismatch slot, importing slot: {}, but 
got: {}", import_slot_, slot)};
+  if (import_slot_range_ != slot_range) {
+    return {Status::NotOK, fmt::format("mismatch slot, importing slot(s): {}, 
but got: {}", import_slot_range_.String(),
+                                       slot_range.String())};
   }
 
   // Clean imported slot data
-  auto s = ClearKeysOfSlot(namespace_, slot);
+  auto s = ClearKeysOfSlotRange(namespace_, slot_range);
   if (!s.ok()) {
-    return {Status::NotOK, fmt::format("clear keys of slot error: {}", 
s.ToString())};
+    return {Status::NotOK, fmt::format("clear keys of slot(s) error: {}", 
s.ToString())};
   }
 
   import_status_ = kImportFailed;
@@ -96,7 +99,7 @@ Status SlotImport::StopForLinkError() {
   //    from new master.
   if (!srv_->IsSlave()) {
     // Clean imported slot data
-    auto s = ClearKeysOfSlot(namespace_, import_slot_);
+    auto s = ClearKeysOfSlotRange(namespace_, import_slot_range_);
     if (!s.ok()) {
       return {Status::NotOK, fmt::format("clear keys of slot error: {}", 
s.ToString())};
     }
@@ -106,13 +109,13 @@ Status SlotImport::StopForLinkError() {
   return Status::OK();
 }
 
-int SlotImport::GetSlot() {
+SlotRange SlotImport::GetSlotRange() {
   std::lock_guard<std::mutex> guard(mutex_);
   // import_slot_ only be set when import_status_ is kImportStart
   if (import_status_ != kImportStart) {
-    return -1;
+    return {-1, -1};
   }
-  return import_slot_;
+  return import_slot_range_;
 }
 
 int SlotImport::GetStatus() {
@@ -123,7 +126,7 @@ int SlotImport::GetStatus() {
 void SlotImport::GetImportInfo(std::string *info) {
   std::lock_guard<std::mutex> guard(mutex_);
   info->clear();
-  if (import_slot_ < 0) {
+  if (!import_slot_range_.IsValid()) {
     return;
   }
 
@@ -145,5 +148,5 @@ void SlotImport::GetImportInfo(std::string *info) {
       break;
   }
 
-  *info = fmt::format("importing_slot: {}\r\nimport_state: {}\r\n", 
import_slot_, import_stat);
+  *info = fmt::format("importing_slot(s): {}\r\nimport_state: {}\r\n", 
import_slot_range_.String(), import_stat);
 }
diff --git a/src/cluster/slot_import.h b/src/cluster/slot_import.h
index 9f4bb72d..e3a5d32c 100644
--- a/src/cluster/slot_import.h
+++ b/src/cluster/slot_import.h
@@ -26,6 +26,7 @@
 #include <string>
 #include <vector>
 
+#include "cluster_defs.h"
 #include "config/config.h"
 #include "server/server.h"
 #include "storage/redis_db.h"
@@ -42,17 +43,17 @@ class SlotImport : public redis::Database {
   explicit SlotImport(Server *srv);
   ~SlotImport() = default;
 
-  Status Start(int slot);
-  Status Success(int slot);
-  Status Fail(int slot);
+  Status Start(const SlotRange &slot_range);
+  Status Success(const SlotRange &slot_range);
+  Status Fail(const SlotRange &slot_range);
   Status StopForLinkError();
-  int GetSlot();
+  SlotRange GetSlotRange();
   int GetStatus();
   void GetImportInfo(std::string *info);
 
  private:
   Server *srv_ = nullptr;
   std::mutex mutex_;
-  int import_slot_;
+  SlotRange import_slot_range_;
   int import_status_;
 };
diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc
index 03625dc6..f91c9a1a 100644
--- a/src/cluster/slot_migrate.cc
+++ b/src/cluster/slot_migrate.cc
@@ -74,20 +74,20 @@ SlotMigrator::SlotMigrator(Server *srv)
   }
 }
 
-Status SlotMigrator::PerformSlotMigration(const std::string &node_id, 
std::string &dst_ip, int dst_port, int slot_id,
-                                          SyncMigrateContext *blocking_ctx) {
+Status SlotMigrator::PerformSlotRangeMigration(const std::string &node_id, 
std::string &dst_ip, int dst_port,
+                                               const SlotRange &slot_range, 
SyncMigrateContext *blocking_ctx) {
+  // TODO: concurrent migration, multiple migration jobs
   // Only one slot migration job at the same time
-  int16_t no_slot = -1;
-  if (!migrating_slot_.compare_exchange_strong(no_slot, 
static_cast<int16_t>(slot_id))) {
-    return {Status::NotOK, "There is already a migrating slot"};
+  SlotRange empty_slot_range = {-1, -1};
+  if (!slot_range_.compare_exchange_strong(empty_slot_range, slot_range)) {
+    return {Status::NotOK, "There is already a migrating job"};
   }
 
-  if (forbidden_slot_ == slot_id) {
+  if (slot_range.HasOverlap(forbidden_slot_range_)) {
     // Have to release migrate slot set above
-    migrating_slot_ = -1;
+    slot_range_ = empty_slot_range;
     return {Status::NotOK, "Can't migrate slot which has been migrated"};
   }
-
   migration_state_ = MigrationState::kStarted;
 
   auto speed = srv_->GetConfig()->migrate_speed;
@@ -115,14 +115,14 @@ Status SlotMigrator::PerformSlotMigration(const 
std::string &node_id, std::strin
   dst_node_ = node_id;
 
   // Create migration job
-  auto job = std::make_unique<SlotMigrationJob>(slot_id, dst_ip, dst_port, 
speed, pipeline_size, seq_gap);
+  auto job = std::make_unique<SlotMigrationJob>(slot_range, dst_ip, dst_port, 
speed, pipeline_size, seq_gap);
   {
     std::lock_guard<std::mutex> guard(job_mutex_);
     migration_job_ = std::move(job);
     job_cv_.notify_one();
   }
 
-  LOG(INFO) << "[migrate] Start migrating slot " << slot_id << " to " << 
dst_ip << ":" << dst_port;
+  LOG(INFO) << "[migrate] Start migrating slot(s) " << slot_range.String() << 
" to " << dst_ip << ":" << dst_port;
 
   return Status::OK();
 }
@@ -159,8 +159,9 @@ void SlotMigrator::loop() {
       return;
     }
 
-    LOG(INFO) << "[migrate] Migrating slot: " << migration_job_->slot_id << ", 
dst_ip: " << migration_job_->dst_ip
-              << ", dst_port: " << migration_job_->dst_port << ", max_speed: " 
<< migration_job_->max_speed
+    LOG(INFO) << "[migrate] Migrating slot(s): " << 
migration_job_->slot_range.String()
+              << ", dst_ip: " << migration_job_->dst_ip << ", dst_port: " << 
migration_job_->dst_port
+              << ", max_speed: " << migration_job_->max_speed
               << ", max_pipeline_size: " << migration_job_->max_pipeline_size;
 
     dst_ip_ = migration_job_->dst_ip;
@@ -187,10 +188,11 @@ void SlotMigrator::runMigrationProcess() {
       case SlotMigrationStage::kStart: {
         auto s = startMigration();
         if (s.IsOK()) {
-          LOG(INFO) << "[migrate] Succeed to start migrating slot " << 
migrating_slot_;
+          LOG(INFO) << "[migrate] Succeed to start migrating slot(s) " << 
slot_range_.load().String();
           current_stage_ = SlotMigrationStage::kSnapshot;
         } else {
-          LOG(ERROR) << "[migrate] Failed to start migrating slot " << 
migrating_slot_ << ". Error: " << s.Msg();
+          LOG(ERROR) << "[migrate] Failed to start migrating slot(s) " << 
slot_range_.load().String()
+                     << ". Error: " << s.Msg();
           current_stage_ = SlotMigrationStage::kFailed;
           resumeSyncCtx(s);
         }
@@ -201,7 +203,8 @@ void SlotMigrator::runMigrationProcess() {
         if (s.IsOK()) {
           current_stage_ = SlotMigrationStage::kWAL;
         } else {
-          LOG(ERROR) << "[migrate] Failed to send snapshot of slot " << 
migrating_slot_ << ". Error: " << s.Msg();
+          LOG(ERROR) << "[migrate] Failed to send snapshot of slot(s) " << 
slot_range_.load().String()
+                     << ". Error: " << s.Msg();
           current_stage_ = SlotMigrationStage::kFailed;
           resumeSyncCtx(s);
         }
@@ -210,10 +213,11 @@ void SlotMigrator::runMigrationProcess() {
       case SlotMigrationStage::kWAL: {
         auto s = syncWAL();
         if (s.IsOK()) {
-          LOG(INFO) << "[migrate] Succeed to sync from WAL for a slot " << 
migrating_slot_;
+          LOG(INFO) << "[migrate] Succeed to sync from WAL for slot(s) " << 
slot_range_.load().String();
           current_stage_ = SlotMigrationStage::kSuccess;
         } else {
-          LOG(ERROR) << "[migrate] Failed to sync from WAL for a slot " << 
migrating_slot_ << ". Error: " << s.Msg();
+          LOG(ERROR) << "[migrate] Failed to sync from WAL for slot(s) " << 
slot_range_.load().String()
+                     << ". Error: " << s.Msg();
           current_stage_ = SlotMigrationStage::kFailed;
           resumeSyncCtx(s);
         }
@@ -222,12 +226,12 @@ void SlotMigrator::runMigrationProcess() {
       case SlotMigrationStage::kSuccess: {
         auto s = finishSuccessfulMigration();
         if (s.IsOK()) {
-          LOG(INFO) << "[migrate] Succeed to migrate slot " << migrating_slot_;
+          LOG(INFO) << "[migrate] Succeed to migrate slot(s) " << 
slot_range_.load().String();
           current_stage_ = SlotMigrationStage::kClean;
           migration_state_ = MigrationState::kSuccess;
           resumeSyncCtx(s);
         } else {
-          LOG(ERROR) << "[migrate] Failed to finish a successful migration of 
slot " << migrating_slot_
+          LOG(ERROR) << "[migrate] Failed to finish a successful migration of 
slot(s) " << slot_range_.load().String()
                      << ". Error: " << s.Msg();
           current_stage_ = SlotMigrationStage::kFailed;
           resumeSyncCtx(s);
@@ -237,10 +241,10 @@ void SlotMigrator::runMigrationProcess() {
       case SlotMigrationStage::kFailed: {
         auto s = finishFailedMigration();
         if (!s.IsOK()) {
-          LOG(ERROR) << "[migrate] Failed to finish a failed migration of slot 
" << migrating_slot_
+          LOG(ERROR) << "[migrate] Failed to finish a failed migration of 
slot(s) " << slot_range_.load().String()
                      << ". Error: " << s.Msg();
         }
-        LOG(INFO) << "[migrate] Failed to migrate a slot" << migrating_slot_;
+        LOG(INFO) << "[migrate] Failed to migrate a slot(s) " << 
slot_range_.load().String();
         migration_state_ = MigrationState::kFailed;
         current_stage_ = SlotMigrationStage::kClean;
         break;
@@ -302,7 +306,8 @@ Status SlotMigrator::startMigration() {
     }
   }
 
-  LOG(INFO) << "[migrate] Start migrating slot " << migrating_slot_ << ", 
connect destination fd " << *dst_fd_;
+  LOG(INFO) << "[migrate] Start migrating slot(s) " << 
slot_range_.load().String() << ", connect destination fd "
+            << *dst_fd_;
 
   return Status::OK();
 }
@@ -330,13 +335,13 @@ Status SlotMigrator::sendSnapshotByCmd() {
   uint64_t expired_key_cnt = 0;
   uint64_t empty_key_cnt = 0;
   std::string restore_cmds;
-  int16_t slot = migrating_slot_;
+  SlotRange slot_range = slot_range_;
 
-  LOG(INFO) << "[migrate] Start migrating snapshot of slot " << slot;
+  LOG(INFO) << "[migrate] Start migrating snapshot of slot(s)" << 
slot_range.String();
 
   // Construct key prefix to iterate the keys belong to the target slot
-  std::string prefix = ComposeSlotKeyPrefix(namespace_, slot);
-  LOG(INFO) << "[migrate] Iterate keys of slot, key's prefix: " << prefix;
+  std::string prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
+  LOG(INFO) << "[migrate] Iterate keys of slot(s), key's prefix: " << prefix;
 
   rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
   read_options.snapshot = slot_snapshot_;
@@ -346,6 +351,7 @@ Status SlotMigrator::sendSnapshotByCmd() {
   auto iter = 
util::UniqueIterator(storage_->GetDB()->NewIterator(read_options, cf_handle));
 
   // Seek to the beginning of keys start with 'prefix' and iterate all these 
keys
+  auto current_slot = slot_range.start;
   for (iter->Seek(prefix); iter->Valid(); iter->Next()) {
     // The migrating task has to be stopped, if server role is changed from 
master to slave
     // or flush command (flushdb or flushall) is executed
@@ -354,7 +360,8 @@ Status SlotMigrator::sendSnapshotByCmd() {
     }
 
     // Iteration is out of range
-    if (!iter->key().starts_with(prefix)) {
+    current_slot = ExtractSlotId(iter->key());
+    if (!slot_range.Contains(current_slot)) {
       break;
     }
 
@@ -384,8 +391,8 @@ Status SlotMigrator::sendSnapshotByCmd() {
 
   if (auto s = iter->status(); !s.ok()) {
     auto err_str = s.ToString();
-    LOG(ERROR) << "[migrate] Failed to iterate keys of slot " << slot << ": " 
<< err_str;
-    return {Status::NotOK, fmt::format("failed to iterate keys of slot {}: 
{}", slot, err_str)};
+    LOG(ERROR) << "[migrate] Failed to iterate keys of slot " << current_slot 
<< ": " << err_str;
+    return {Status::NotOK, fmt::format("failed to iterate keys of slot {}: 
{}", current_slot, err_str)};
   }
 
   // It's necessary to send commands that are still in the pipeline since the 
final pipeline may not be sent
@@ -395,8 +402,9 @@ Status SlotMigrator::sendSnapshotByCmd() {
     return s.Prefixed(errFailedToSendCommands);
   }
 
-  LOG(INFO) << "[migrate] Succeed to migrate slot snapshot, slot: " << slot << 
", Migrated keys: " << migrated_key_cnt
-            << ", Expired keys: " << expired_key_cnt << ", Empty keys: " << 
empty_key_cnt;
+  LOG(INFO) << "[migrate] Succeed to migrate slot(s) snapshot, slot(s): " << 
slot_range.String()
+            << ", Migrated keys: " << migrated_key_cnt << ", Expired keys: " 
<< expired_key_cnt
+            << ", Empty keys: " << empty_key_cnt;
 
   return Status::OK();
 }
@@ -408,7 +416,7 @@ Status SlotMigrator::syncWALByCmd() {
     return s.Prefixed("failed to sync WAL before forbidding a slot");
   }
 
-  setForbiddenSlot(migrating_slot_);
+  setForbiddenSlotRange(slot_range_);
 
   // Send last incremental data
   s = syncWalAfterForbiddingSlot();
@@ -431,20 +439,21 @@ Status SlotMigrator::finishSuccessfulMigration() {
   }
 
   std::string dst_ip_port = dst_ip_ + ":" + std::to_string(dst_port_);
-  s = srv_->cluster->SetSlotMigrated(migrating_slot_, dst_ip_port);
+  s = srv_->cluster->SetSlotRangeMigrated(slot_range_, dst_ip_port);
   if (!s.IsOK()) {
-    return s.Prefixed(fmt::format("failed to set slot {} as migrated to {}", 
migrating_slot_.load(), dst_ip_port));
+    return s.Prefixed(
+        fmt::format("failed to set slot(s) {} as migrated to {}", 
slot_range_.load().String(), dst_ip_port));
   }
 
-  migrate_failed_slot_ = -1;
+  migrate_failed_slot_range_ = {-1, -1};
 
   return Status::OK();
 }
 
 Status SlotMigrator::finishFailedMigration() {
   // Stop slot will forbid writing
-  migrate_failed_slot_ = migrating_slot_;
-  forbidden_slot_ = -1;
+  migrate_failed_slot_range_ = slot_range_.load();
+  forbidden_slot_range_ = {-1, -1};
 
   // Set import status on the destination node to FAILED
   auto s = setImportStatusOnDstNode(*dst_fd_, kImportFailed);
@@ -456,7 +465,7 @@ Status SlotMigrator::finishFailedMigration() {
 }
 
 void SlotMigrator::clean() {
-  LOG(INFO) << "[migrate] Clean resources of migrating slot " << 
migrating_slot_;
+  LOG(INFO) << "[migrate] Clean resources of migrating slot(s) " << 
slot_range_.load().String();
   if (slot_snapshot_) {
     storage_->GetDB()->ReleaseSnapshot(slot_snapshot_);
     slot_snapshot_ = nullptr;
@@ -468,7 +477,7 @@ void SlotMigrator::clean() {
   std::lock_guard<std::mutex> guard(job_mutex_);
   migration_job_.reset();
   dst_fd_.Reset();
-  migrating_slot_ = -1;
+  slot_range_ = {-1, -1};
   SetStopMigrationFlag(false);
 }
 
@@ -491,7 +500,7 @@ Status SlotMigrator::setImportStatusOnDstNode(int sock_fd, 
int status) {
   if (sock_fd <= 0) return {Status::NotOK, "invalid socket descriptor"};
 
   std::string cmd =
-      redis::ArrayOfBulkStrings({"cluster", "import", 
std::to_string(migrating_slot_), std::to_string(status)});
+      redis::ArrayOfBulkStrings({"cluster", "import", 
slot_range_.load().String(), std::to_string(status)});
   auto s = util::SockSend(sock_fd, cmd);
   if (!s.IsOK()) {
     return s.Prefixed("failed to send command to the destination node");
@@ -989,21 +998,21 @@ Status SlotMigrator::sendCmdsPipelineIfNeed(std::string 
*commands, bool need) {
   return Status::OK();
 }
 
-void SlotMigrator::setForbiddenSlot(int16_t slot) {
-  LOG(INFO) << "[migrate] Setting forbidden slot " << slot;
+void SlotMigrator::setForbiddenSlotRange(const SlotRange &slot_range) {
+  LOG(INFO) << "[migrate] Setting forbidden slot(s) " << slot_range.String();
   // Block server to set forbidden slot
   uint64_t during = util::GetTimeStampUS();
   {
     auto exclusivity = srv_->WorkExclusivityGuard();
-    forbidden_slot_ = slot;
+    forbidden_slot_range_ = slot_range;
   }
   during = util::GetTimeStampUS() - during;
   LOG(INFO) << "[migrate] To set forbidden slot, server was blocked for " << 
during << "us";
 }
 
-void SlotMigrator::ReleaseForbiddenSlot() {
-  LOG(INFO) << "[migrate] Release forbidden slot " << forbidden_slot_;
-  forbidden_slot_ = -1;
+void SlotMigrator::ReleaseForbiddenSlotRange() {
+  LOG(INFO) << "[migrate] Release forbidden slot(s) " << 
forbidden_slot_range_.load().String();
+  forbidden_slot_range_ = {-1, -1};
 }
 
 void SlotMigrator::applyMigrationSpeedLimit() const {
@@ -1023,7 +1032,7 @@ void SlotMigrator::applyMigrationSpeedLimit() const {
 
 Status SlotMigrator::generateCmdsFromBatch(rocksdb::BatchResult *batch, 
std::string *commands) {
   // Iterate batch to get keys and construct commands for keys
-  WriteBatchExtractor write_batch_extractor(storage_->IsSlotIdEncoded(), 
migrating_slot_, false);
+  WriteBatchExtractor write_batch_extractor(storage_->IsSlotIdEncoded(), 
slot_range_, false);
   rocksdb::Status status = 
batch->writeBatchPtr->Iterate(&write_batch_extractor);
   if (!status.ok()) {
     LOG(ERROR) << "[migrate] Failed to parse write batch, Err: " << 
status.ToString();
@@ -1163,11 +1172,12 @@ Status SlotMigrator::syncWalAfterForbiddingSlot() {
 
 void SlotMigrator::GetMigrationInfo(std::string *info) const {
   info->clear();
-  if (migrating_slot_ < 0 && forbidden_slot_ < 0 && migrate_failed_slot_ < 0) {
+  if (!slot_range_.load().IsValid() && !forbidden_slot_range_.load().IsValid() 
&&
+      !migrate_failed_slot_range_.load().IsValid()) {
     return;
   }
 
-  int16_t slot = -1;
+  SlotRange slot_range;
   std::string task_state;
 
   switch (migration_state_.load()) {
@@ -1176,22 +1186,22 @@ void SlotMigrator::GetMigrationInfo(std::string *info) 
const {
       break;
     case MigrationState::kStarted:
       task_state = "start";
-      slot = migrating_slot_;
+      slot_range = slot_range_;
       break;
     case MigrationState::kSuccess:
       task_state = "success";
-      slot = forbidden_slot_;
+      slot_range = forbidden_slot_range_;
       break;
     case MigrationState::kFailed:
       task_state = "fail";
-      slot = migrate_failed_slot_;
+      slot_range = migrate_failed_slot_range_;
       break;
     default:
       break;
   }
 
-  *info =
-      fmt::format("migrating_slot: {}\r\ndestination_node: 
{}\r\nmigrating_state: {}\r\n", slot, dst_node_, task_state);
+  *info = fmt::format("migrating_slot(s): {}\r\ndestination_node: 
{}\r\nmigrating_state: {}\r\n", slot_range.String(),
+                      dst_node_, task_state);
 }
 
 void SlotMigrator::CancelSyncCtx() {
@@ -1217,9 +1227,10 @@ Status SlotMigrator::sendMigrationBatch(BatchSender 
*batch) {
 
 Status SlotMigrator::sendSnapshotByRawKV() {
   uint64_t start_ts = util::GetTimeStampMS();
-  LOG(INFO) << "[migrate] Migrating snapshot of slot " << migrating_slot_ << " 
by raw key value";
+  auto slot_range = slot_range_.load();
+  LOG(INFO) << "[migrate] Migrating snapshot of slot(s) " << 
slot_range.String() << " by raw key value";
 
-  auto prefix = ComposeSlotKeyPrefix(namespace_, migrating_slot_);
+  auto prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
   rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
   read_options.snapshot = slot_snapshot_;
   rocksdb::Slice prefix_slice(prefix);
@@ -1228,7 +1239,13 @@ Status SlotMigrator::sendSnapshotByRawKV() {
 
   BatchSender batch_sender(*dst_fd_, migrate_batch_size_bytes_, 
migrate_batch_bytes_per_sec_);
 
-  for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); 
iter.Next()) {
+  for (iter.Seek(prefix); iter.Valid(); iter.Next()) {
+    // Iteration is out of range
+    auto key_slot_id = ExtractSlotId(iter.Key());
+    if (!slot_range.Contains(key_slot_id)) {
+      break;
+    }
+
     auto redis_type = iter.Type();
     std::string log_data;
     if (redis_type == RedisType::kRedisList) {
@@ -1273,9 +1290,9 @@ Status SlotMigrator::sendSnapshotByRawKV() {
 
   auto elapsed = util::GetTimeStampMS() - start_ts;
   LOG(INFO) << fmt::format(
-      "[migrate] Succeed to migrate snapshot, slot: {}, elapsed: {} ms, "
+      "[migrate] Succeed to migrate snapshot range, slot(s): {}, elapsed: {} 
ms, "
       "sent: {} bytes, rate: {:.2f} kb/s, batches: {}, entries: {}",
-      migrating_slot_.load(), elapsed, batch_sender.GetSentBytes(), 
batch_sender.GetRate(start_ts),
+      slot_range.String(), elapsed, batch_sender.GetSentBytes(), 
batch_sender.GetRate(start_ts),
       batch_sender.GetSentBatchesNum(), batch_sender.GetEntriesNum());
 
   return Status::OK();
@@ -1283,7 +1300,7 @@ Status SlotMigrator::sendSnapshotByRawKV() {
 
 Status SlotMigrator::syncWALByRawKV() {
   uint64_t start_ts = util::GetTimeStampMS();
-  LOG(INFO) << "[migrate] Syncing WAL of slot " << migrating_slot_ << " by raw 
key value";
+  LOG(INFO) << "[migrate] Syncing WAL of slot(s) " << 
slot_range_.load().String() << " by raw key value";
   BatchSender batch_sender(*dst_fd_, migrate_batch_size_bytes_, 
migrate_batch_bytes_per_sec_);
 
   int epoch = 1;
@@ -1304,7 +1321,7 @@ Status SlotMigrator::syncWALByRawKV() {
     epoch++;
   }
 
-  setForbiddenSlot(migrating_slot_);
+  setForbiddenSlotRange(slot_range_);
 
   wal_incremental_seq = storage_->GetDB()->GetLatestSequenceNumber();
   if (wal_incremental_seq > wal_begin_seq_) {
@@ -1318,9 +1335,9 @@ Status SlotMigrator::syncWALByRawKV() {
 
   auto elapsed = util::GetTimeStampMS() - start_ts;
   LOG(INFO) << fmt::format(
-      "[migrate] Succeed to migrate incremental data, slot: {}, elapsed: {} 
ms, "
+      "[migrate] Succeed to migrate incremental data, slot(s): {}, elapsed: {} 
ms, "
       "sent: {} bytes, rate: {:.2f} kb/s, batches: {}, entries: {}",
-      migrating_slot_.load(), elapsed, batch_sender.GetSentBytes(), 
batch_sender.GetRate(start_ts),
+      slot_range_.load().String(), elapsed, batch_sender.GetSentBytes(), 
batch_sender.GetRate(start_ts),
       batch_sender.GetSentBatchesNum(), batch_sender.GetEntriesNum());
 
   return Status::OK();
@@ -1329,15 +1346,16 @@ Status SlotMigrator::syncWALByRawKV() {
 bool SlotMigrator::catchUpIncrementalWAL() {
   uint64_t gap = storage_->GetDB()->GetLatestSequenceNumber() - wal_begin_seq_;
   if (gap <= seq_gap_limit_) {
-    LOG(INFO) << fmt::format("[migrate] Incremental data sequence gap: {}, 
less than limit: {}, set forbidden slot: {}",
-                             gap, seq_gap_limit_, migrating_slot_.load());
+    LOG(INFO) << fmt::format(
+        "[migrate] Incremental data sequence gap: {}, less than limit: {}, set 
forbidden slot(s): {}", gap,
+        seq_gap_limit_, slot_range_.load().String());
     return true;
   }
   return false;
 }
 
 Status SlotMigrator::migrateIncrementalDataByRawKV(uint64_t end_seq, 
BatchSender *batch_sender) {
-  engine::WALIterator wal_iter(storage_, migrating_slot_);
+  engine::WALIterator wal_iter(storage_, slot_range_);
   uint64_t start_seq = wal_begin_seq_ + 1;
   for (wal_iter.Seek(start_seq); wal_iter.Valid(); wal_iter.Next()) {
     if (wal_iter.NextSequenceNumber() > end_seq + 1) {
diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h
index e22ba47d..179150b8 100644
--- a/src/cluster/slot_migrate.h
+++ b/src/cluster/slot_migrate.h
@@ -55,8 +55,9 @@ enum class SlotMigrationStage { kNone, kStart, kSnapshot, 
kWAL, kSuccess, kFaile
 enum class KeyMigrationResult { kMigrated, kExpired, kUnderlyingStructEmpty };
 
 struct SlotMigrationJob {
-  SlotMigrationJob(int slot_id, std::string dst_ip, int dst_port, int speed, 
int pipeline_size, int seq_gap)
-      : slot_id(static_cast<int16_t>(slot_id)),
+  SlotMigrationJob(const SlotRange &slot_range_in, std::string dst_ip, int 
dst_port, int speed, int pipeline_size,
+                   int seq_gap)
+      : slot_range(slot_range_in),
         dst_ip(std::move(dst_ip)),
         dst_port(dst_port),
         max_speed(speed),
@@ -66,7 +67,7 @@ struct SlotMigrationJob {
   SlotMigrationJob &operator=(const SlotMigrationJob &other) = delete;
   ~SlotMigrationJob() = default;
 
-  int16_t slot_id;
+  SlotRange slot_range;
   std::string dst_ip;
   int dst_port;
   int max_speed;
@@ -84,9 +85,9 @@ class SlotMigrator : public redis::Database {
   ~SlotMigrator();
 
   Status CreateMigrationThread();
-  Status PerformSlotMigration(const std::string &node_id, std::string &dst_ip, 
int dst_port, int slot_id,
-                              SyncMigrateContext *blocking_ctx = nullptr);
-  void ReleaseForbiddenSlot();
+  Status PerformSlotRangeMigration(const std::string &node_id, std::string 
&dst_ip, int dst_port,
+                                   const SlotRange &range, SyncMigrateContext 
*blocking_ctx = nullptr);
+  void ReleaseForbiddenSlotRange();
   void SetMaxMigrationSpeed(int value) {
     if (value >= 0) max_migration_speed_ = value;
   }
@@ -101,8 +102,8 @@ class SlotMigrator : public redis::Database {
   void SetStopMigrationFlag(bool value) { stop_migration_ = value; }
   bool IsMigrationInProgress() const { return migration_state_ == 
MigrationState::kStarted; }
   SlotMigrationStage GetCurrentSlotMigrationStage() const { return 
current_stage_; }
-  int16_t GetForbiddenSlot() const { return forbidden_slot_; }
-  int16_t GetMigratingSlot() const { return migrating_slot_; }
+  SlotRange GetForbiddenSlotRange() const { return forbidden_slot_range_; }
+  SlotRange GetMigratingSlotRange() const { return slot_range_; }
   std::string GetDstNode() const { return dst_node_; }
   void GetMigrationInfo(std::string *info) const;
   void CancelSyncCtx();
@@ -149,7 +150,7 @@ class SlotMigrator : public redis::Database {
   bool catchUpIncrementalWAL();
   Status migrateIncrementalDataByRawKV(uint64_t end_seq, BatchSender 
*batch_sender);
 
-  void setForbiddenSlot(int16_t slot);
+  void setForbiddenSlotRange(const SlotRange &slot_range);
   std::unique_lock<std::mutex> blockingLock() { return 
std::unique_lock<std::mutex>(blocking_mutex_); }
 
   void resumeSyncCtx(const Status &migrate_result);
@@ -190,9 +191,12 @@ class SlotMigrator : public redis::Database {
   UniqueFD dst_fd_;
 
   MigrationType migration_type_ = MigrationType::kRedisCommand;
-  std::atomic<int16_t> forbidden_slot_ = -1;
-  std::atomic<int16_t> migrating_slot_ = -1;
-  int16_t migrate_failed_slot_ = -1;
+
+  static_assert(std::atomic<SlotRange>::is_always_lock_free, "SlotRange is not 
lock free.");
+  std::atomic<SlotRange> forbidden_slot_range_ = SlotRange{-1, -1};
+  std::atomic<SlotRange> slot_range_ = SlotRange{-1, -1};
+  std::atomic<SlotRange> migrate_failed_slot_range_ = SlotRange{-1, -1};
+
   std::atomic<bool> stop_migration_ = false;  // if is true migration will be 
stopped but the thread won't be destroyed
   const rocksdb::Snapshot *slot_snapshot_ = nullptr;
   uint64_t wal_begin_seq_ = 0;
diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc
index 0f9b1760..e4fb2ded 100644
--- a/src/commands/cmd_cluster.cc
+++ b/src/commands/cmd_cluster.cc
@@ -47,7 +47,11 @@ class CommandCluster : public Commander {
 
     if (subcommand_ == "import") {
       if (args.size() != 4) return {Status::RedisParseErr, 
errWrongNumOfArguments};
-      slot_ = GET_OR_RET(ParseInt<int64_t>(args[2], 10));
+
+      Status s = CommandTable::ParseSlotRanges(args_[2], slot_ranges_);
+      if (!s.IsOK()) {
+        return s;
+      }
 
       auto state = ParseInt<unsigned>(args[3], {kImportStart, kImportNone}, 
10);
       if (!state) return {Status::NotOK, "Invalid import state"};
@@ -109,7 +113,8 @@ class CommandCluster : public Commander {
         return s;
       }
     } else if (subcommand_ == "import") {
-      Status s = srv->cluster->ImportSlot(conn, static_cast<int>(slot_), 
state_);
+      // TODO: support multiple slot ranges
+      Status s = srv->cluster->ImportSlotRange(conn, slot_ranges_[0], state_);
       if (s.IsOK()) {
         *output = redis::SimpleString("OK");
       } else {
@@ -138,7 +143,7 @@ class CommandCluster : public Commander {
 
  private:
   std::string subcommand_;
-  int64_t slot_ = -1;
+  std::vector<SlotRange> slot_ranges_;
   ImportStatus state_ = kImportNone;
 };
 
@@ -154,7 +159,10 @@ class CommandClusterX : public Commander {
     if (subcommand_ == "migrate") {
       if (args.size() < 4 || args.size() > 6) return {Status::RedisParseErr, 
errWrongNumOfArguments};
 
-      slot_ = GET_OR_RET(ParseInt<int64_t>(args[2], 10));
+      Status s = CommandTable::ParseSlotRanges(args_[2], slot_ranges_);
+      if (!s.IsOK()) {
+        return s;
+      }
 
       dst_node_id_ = args[3];
 
@@ -279,8 +287,8 @@ class CommandClusterX : public Commander {
       if (sync_migrate_) {
         sync_migrate_ctx_ = std::make_unique<SyncMigrateContext>(srv, conn, 
sync_migrate_timeout_);
       }
-
-      Status s = srv->cluster->MigrateSlot(static_cast<int>(slot_), 
dst_node_id_, sync_migrate_ctx_.get());
+      // TODO: support multiple slot ranges
+      Status s = srv->cluster->MigrateSlotRange(slot_ranges_[0], dst_node_id_, 
sync_migrate_ctx_.get());
       if (s.IsOK()) {
         if (sync_migrate_) {
           return {Status::BlockingCmd};
diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc
index c1559b1a..e9ac4925 100644
--- a/src/storage/batch_extractor.cc
+++ b/src/storage/batch_extractor.cc
@@ -53,7 +53,8 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t 
column_family_id, const Slic
 
   if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::Metadata)) {
     std::tie(ns, user_key) = ExtractNamespaceKey<std::string>(key, 
is_slot_id_encoded_);
-    if (slot_id_ >= 0 && static_cast<uint16_t>(slot_id_) != 
GetSlotIdFromKey(user_key)) {
+    auto key_slot_id = GetSlotIdFromKey(user_key);
+    if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
       return rocksdb::Status::OK();
     }
 
@@ -112,7 +113,8 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t 
column_family_id, const Slic
   if (column_family_id == 
static_cast<uint32_t>(ColumnFamilyID::PrimarySubkey)) {
     InternalKey ikey(key, is_slot_id_encoded_);
     user_key = ikey.GetKey().ToString();
-    if (slot_id_ >= 0 && static_cast<uint16_t>(slot_id_) != 
GetSlotIdFromKey(user_key)) {
+    auto key_slot_id = GetSlotIdFromKey(user_key);
+    if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
       return rocksdb::Status::OK();
     }
 
@@ -279,7 +281,8 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t 
column_family_id, const S
     std::string user_key;
     std::tie(ns, user_key) = ExtractNamespaceKey<std::string>(key, 
is_slot_id_encoded_);
 
-    if (slot_id_ >= 0 && static_cast<uint16_t>(slot_id_) != 
GetSlotIdFromKey(user_key)) {
+    auto key_slot_id = GetSlotIdFromKey(user_key);
+    if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
       return rocksdb::Status::OK();
     }
 
@@ -287,7 +290,8 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t 
column_family_id, const S
   } else if (column_family_id == 
static_cast<uint32_t>(ColumnFamilyID::PrimarySubkey)) {
     InternalKey ikey(key, is_slot_id_encoded_);
     std::string user_key = ikey.GetKey().ToString();
-    if (slot_id_ >= 0 && static_cast<uint16_t>(slot_id_) != 
GetSlotIdFromKey(user_key)) {
+    auto key_slot_id = GetSlotIdFromKey(user_key);
+    if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
       return rocksdb::Status::OK();
     }
 
diff --git a/src/storage/batch_extractor.h b/src/storage/batch_extractor.h
index 694cc3bf..4f577504 100644
--- a/src/storage/batch_extractor.h
+++ b/src/storage/batch_extractor.h
@@ -24,6 +24,7 @@
 #include <string>
 #include <vector>
 
+#include "cluster/cluster_defs.h"
 #include "redis_db.h"
 #include "redis_metadata.h"
 #include "status.h"
@@ -32,8 +33,10 @@
 // An extractor to extract update from raw write batch
 class WriteBatchExtractor : public rocksdb::WriteBatch::Handler {
  public:
-  explicit WriteBatchExtractor(bool is_slot_id_encoded, int16_t slot_id = -1, 
bool to_redis = false)
-      : is_slot_id_encoded_(is_slot_id_encoded), slot_id_(slot_id), 
to_redis_(to_redis) {}
+  explicit WriteBatchExtractor(bool is_slot_id_encoded, int slot = -1, bool 
to_redis = false)
+      : is_slot_id_encoded_(is_slot_id_encoded), slot_range_(slot, slot), 
to_redis_(to_redis) {}
+  explicit WriteBatchExtractor(bool is_slot_id_encoded, const SlotRange 
&slot_range, bool to_redis = false)
+      : is_slot_id_encoded_(is_slot_id_encoded), slot_range_(slot_range), 
to_redis_(to_redis) {}
 
   void LogData(const rocksdb::Slice &blob) override;
   rocksdb::Status PutCF(uint32_t column_family_id, const Slice &key, const 
Slice &value) override;
@@ -49,6 +52,6 @@ class WriteBatchExtractor : public 
rocksdb::WriteBatch::Handler {
   redis::WriteBatchLogData log_data_;
   bool first_seen_ = true;
   bool is_slot_id_encoded_ = false;
-  int slot_id_;
+  SlotRange slot_range_;
   bool to_redis_;
 };
diff --git a/src/storage/iterator.cc b/src/storage/iterator.cc
index c65dd3ab..e368c679 100644
--- a/src/storage/iterator.cc
+++ b/src/storage/iterator.cc
@@ -167,7 +167,8 @@ void SubKeyIterator::Reset() {
 }
 
 rocksdb::Status WALBatchExtractor::PutCF(uint32_t column_family_id, const 
Slice &key, const Slice &value) {
-  if (slot_ != -1 && slot_ != ExtractSlotId(key)) {
+  auto key_slot_id = ExtractSlotId(key);
+  if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
     return rocksdb::Status::OK();
   }
   items_.emplace_back(WALItem::Type::kTypePut, column_family_id, 
key.ToString(), value.ToString());
@@ -175,7 +176,8 @@ rocksdb::Status WALBatchExtractor::PutCF(uint32_t 
column_family_id, const Slice
 }
 
 rocksdb::Status WALBatchExtractor::DeleteCF(uint32_t column_family_id, const 
rocksdb::Slice &key) {
-  if (slot_ != -1 && slot_ != ExtractSlotId(key)) {
+  auto key_slot_id = ExtractSlotId(key);
+  if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
     return rocksdb::Status::OK();
   }
   items_.emplace_back(WALItem::Type::kTypeDelete, column_family_id, 
key.ToString(), std::string{});
@@ -245,7 +247,7 @@ void WALIterator::nextBatch() {
 }
 
 void WALIterator::Seek(rocksdb::SequenceNumber seq) {
-  if (slot_ != -1 && !storage_->IsSlotIdEncoded()) {
+  if (slot_range_.IsValid() && !storage_->IsSlotIdEncoded()) {
     Reset();
     return;
   }
diff --git a/src/storage/iterator.h b/src/storage/iterator.h
index 2f123630..34f96f6f 100644
--- a/src/storage/iterator.h
+++ b/src/storage/iterator.h
@@ -22,6 +22,7 @@
 #include <rocksdb/iterator.h>
 #include <rocksdb/options.h>
 
+#include "cluster/cluster_defs.h"
 #include "storage.h"
 
 namespace engine {
@@ -102,7 +103,8 @@ struct WALItem {
 class WALBatchExtractor : public rocksdb::WriteBatch::Handler {
  public:
   // If set slot, storage must enable slot id encoding
-  explicit WALBatchExtractor(int slot = -1) : slot_(slot) {}
+  explicit WALBatchExtractor(int slot = -1) : slot_range_(slot, slot) {}
+  explicit WALBatchExtractor(const SlotRange &slot_range) : 
slot_range_(slot_range) {}
 
   rocksdb::Status PutCF(uint32_t column_family_id, const Slice &key, const 
Slice &value) override;
 
@@ -133,13 +135,15 @@ class WALBatchExtractor : public 
rocksdb::WriteBatch::Handler {
 
  private:
   std::vector<WALItem> items_;
-  int slot_;
+  SlotRange slot_range_;
 };
 
 class WALIterator {
  public:
+  explicit WALIterator(engine::Storage *storage, const SlotRange &slot_range)
+      : storage_(storage), slot_range_(slot_range), extractor_(slot_range), 
next_batch_seq_(0){};
   explicit WALIterator(engine::Storage *storage, int slot = -1)
-      : storage_(storage), slot_(slot), extractor_(slot), next_batch_seq_(0){};
+      : storage_(storage), slot_range_(slot, slot), extractor_(slot), 
next_batch_seq_(0){};
   ~WALIterator() = default;
 
   bool Valid() const;
@@ -154,7 +158,7 @@ class WALIterator {
   void nextBatch();
 
   engine::Storage *storage_;
-  int slot_;
+  SlotRange slot_range_;
 
   std::unique_ptr<rocksdb::TransactionLogIterator> iter_;
   WALBatchExtractor extractor_;
diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc
index 0fd31137..b8c46458 100644
--- a/src/storage/redis_db.cc
+++ b/src/storage/redis_db.cc
@@ -529,18 +529,14 @@ std::string Database::AppendNamespacePrefix(const Slice 
&user_key) {
   return ComposeNamespaceKey(namespace_, user_key, 
storage_->IsSlotIdEncoded());
 }
 
-rocksdb::Status Database::ClearKeysOfSlot(const rocksdb::Slice &ns, int slot) {
+rocksdb::Status Database::ClearKeysOfSlotRange(const rocksdb::Slice &ns, const 
SlotRange &slot_range) {
   if (!storage_->IsSlotIdEncoded()) {
     return rocksdb::Status::Aborted("It is not in cluster mode");
   }
 
-  std::string prefix = ComposeSlotKeyPrefix(ns, slot);
-  std::string prefix_end = ComposeSlotKeyPrefix(ns, slot + 1);
-  auto s = storage_->DeleteRange(prefix, prefix_end);
-  if (!s.ok()) {
-    return s;
-  }
-  return rocksdb::Status::OK();
+  std::string prefix = ComposeSlotKeyPrefix(ns, slot_range.start);
+  std::string prefix_end = ComposeSlotKeyPrefix(ns, slot_range.end + 1);
+  return storage_->DeleteRange(prefix, prefix_end);
 }
 
 rocksdb::Status Database::KeyExist(const std::string &key) {
diff --git a/src/storage/redis_db.h b/src/storage/redis_db.h
index 12d9896b..289d2a2a 100644
--- a/src/storage/redis_db.h
+++ b/src/storage/redis_db.h
@@ -27,6 +27,7 @@
 #include <variant>
 #include <vector>
 
+#include "cluster/cluster_defs.h"
 #include "redis_metadata.h"
 #include "server/redis_reply.h"
 #include "storage.h"
@@ -134,7 +135,7 @@ class Database {
                                      RedisType type = kRedisNone);
   [[nodiscard]] rocksdb::Status RandomKey(const std::string &cursor, 
std::string *key);
   std::string AppendNamespacePrefix(const Slice &user_key);
-  [[nodiscard]] rocksdb::Status ClearKeysOfSlot(const rocksdb::Slice &ns, int 
slot);
+  [[nodiscard]] rocksdb::Status ClearKeysOfSlotRange(const rocksdb::Slice &ns, 
const SlotRange &slot_range);
   [[nodiscard]] rocksdb::Status KeyExist(const std::string &key);
 
   // Copy <key,value> to <new_key,value> (already an internal key)
diff --git a/tests/gocase/integration/cluster/cluster_test.go 
b/tests/gocase/integration/cluster/cluster_test.go
index fad7d012..541c2a99 100644
--- a/tests/gocase/integration/cluster/cluster_test.go
+++ b/tests/gocase/integration/cluster/cluster_test.go
@@ -511,7 +511,7 @@ func TestClusterReset(t *testing.T) {
                slotNum := 1
                require.Equal(t, "OK", rdb1.Do(ctx, "cluster", "import", 
slotNum, 0).Val())
                clusterInfo := rdb1.ClusterInfo(ctx).Val()
-               require.Contains(t, clusterInfo, "importing_slot: 1")
+               require.Contains(t, clusterInfo, "importing_slot(s): 1")
                require.Contains(t, clusterInfo, "import_state: start")
                require.Contains(t, rdb1.ClusterResetHard(ctx).Err(), "Can't 
reset cluster while importing slot")
                require.Equal(t, "OK", rdb1.Do(ctx, "cluster", "import", 
slotNum, 1).Val())
@@ -533,7 +533,7 @@ func TestClusterReset(t *testing.T) {
 
                require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", 
slotNum, id1).Val())
                clusterInfo := rdb0.ClusterInfo(ctx).Val()
-               require.Contains(t, clusterInfo, "migrating_slot: 2")
+               require.Contains(t, clusterInfo, "migrating_slot(s): 2")
                require.Contains(t, clusterInfo, "migrating_state: start")
                require.Contains(t, rdb0.ClusterResetHard(ctx).Err(), "Can't 
reset cluster while migrating slot")
 
diff --git a/tests/gocase/integration/slotimport/slotimport_test.go 
b/tests/gocase/integration/slotimport/slotimport_test.go
index a3566cab..7b199280 100644
--- a/tests/gocase/integration/slotimport/slotimport_test.go
+++ b/tests/gocase/integration/slotimport/slotimport_test.go
@@ -83,8 +83,8 @@ func TestImportedServer(t *testing.T) {
        require.NoError(t, rdbB.Do(ctx, "clusterx", "SETNODES", clusterNodes, 
"1").Err())
 
        t.Run("IMPORT - error slot", func(t *testing.T) {
-               require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import", -1, 
0).Err(), "Slot is out of range")
-               require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import", 
16384, 0).Err(), "Slot is out of range")
+               require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import", -1, 
0).Err(), "Invalid slot range")
+               require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import", 
16384, 0).Err(), "Invalid slot id: out of numeric range")
        })
 
        t.Run("IMPORT - slot with error state", func(t *testing.T) {
@@ -106,7 +106,7 @@ func TestImportedServer(t *testing.T) {
                require.Equal(t, "slot1", rdbA.Get(ctx, slotKey).Val())
                require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import", 
slotNum, 0).Val())
                clusterInfo := rdbB.ClusterInfo(ctx).Val()
-               require.Contains(t, clusterInfo, "importing_slot: 1")
+               require.Contains(t, clusterInfo, "importing_slot(s): 1")
                require.Contains(t, clusterInfo, "import_state: start")
                clusterNodes := rdbB.ClusterNodes(ctx).Val()
                require.Contains(t, clusterNodes, fmt.Sprintf("[%d-<-%s]", 
slotNum, srvAID))
@@ -114,14 +114,14 @@ func TestImportedServer(t *testing.T) {
                require.NoError(t, rdbA.Do(ctx, "clusterx", "migrate", slotNum, 
srvBID).Err())
                require.Eventually(t, func() bool {
                        clusterInfo := 
rdbA.ClusterInfo(context.Background()).Val()
-                       return strings.Contains(clusterInfo, 
fmt.Sprintf("migrating_slot: %d", slotNum)) &&
+                       return strings.Contains(clusterInfo, 
fmt.Sprintf("migrating_slot(s): %d", slotNum)) &&
                                strings.Contains(clusterInfo, 
fmt.Sprintf("migrating_state: %s", "success"))
                }, 5*time.Second, 100*time.Millisecond)
 
                // import success
                require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import", 
slotNum, 1).Val())
                clusterInfo = rdbB.ClusterInfo(ctx).Val()
-               require.Contains(t, clusterInfo, "importing_slot: 1")
+               require.Contains(t, clusterInfo, "importing_slot(s): 1")
                require.Contains(t, clusterInfo, "import_state: success")
 
                // import finish and should not contain the import section
@@ -145,7 +145,7 @@ func TestImportedServer(t *testing.T) {
                time.Sleep(50 * time.Millisecond)
 
                clusterInfo := rdbB.ClusterInfo(ctx).Val()
-               require.Contains(t, clusterInfo, "importing_slot: 10")
+               require.Contains(t, clusterInfo, "importing_slot(s): 10")
                require.Contains(t, clusterInfo, "import_state: error")
 
                // get empty
@@ -165,7 +165,7 @@ func TestImportedServer(t *testing.T) {
                time.Sleep(50 * time.Millisecond)
 
                clusterInfo := rdbB.ClusterInfo(ctx).Val()
-               require.Contains(t, clusterInfo, "importing_slot: 11")
+               require.Contains(t, clusterInfo, "importing_slot(s): 11")
                require.Contains(t, clusterInfo, "import_state: error")
 
                // get empty
diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go 
b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
index f54a428f..a0cd1f86 100644
--- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go
+++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
@@ -108,12 +108,6 @@ func TestSlotMigrateDestServerKilled(t *testing.T) {
        clusterNodes += fmt.Sprintf("%s %s %d master - 10001-16383", id1, 
srv1.Host(), srv1.Port())
        require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, 
"1").Err())
        require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, 
"1").Err())
-
-       t.Run("MIGRATE - Slot is out of range", func(t *testing.T) {
-               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
-1, id1).Err(), "Slot is out of range")
-               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
16384, id1).Err(), "Slot is out of range")
-       })
-
        t.Run("MIGRATE - Cannot migrate slot to itself", func(t *testing.T) {
                require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 1, 
id0).Err(), "Can't migrate slot to myself")
        })
@@ -482,7 +476,7 @@ func TestSlotMigrateSync(t *testing.T) {
                        err := rdb0.Do(ctx, "clusterx", "migrate", slot, id1, 
"sync", timeout).Err()
                        // go-redis will auto-retry if occurs timeout error, so 
it may return the already migrated slot error,
                        // so we just allow this error here to prevent the 
flaky test failure.
-                       if err != nil && !strings.Contains(err.Error(), "ERR 
There is already a migrating slot") {
+                       if err != nil && !strings.Contains(err.Error(), "ERR 
There is already a migrating job") {
                                require.NoError(t, err)
                        }
                }
@@ -531,7 +525,7 @@ func TestSlotMigrateDataType(t *testing.T) {
                }
                require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", 
slot, id1).Val())
                otherSlot := 2
-               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
otherSlot, id1).Err(), "There is already a migrating slot")
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
otherSlot, id1).Err(), "There is already a migrating job")
                waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess)
                require.EqualValues(t, cnt, rdb1.LLen(ctx, 
util.SlotTable[slot]).Val())
        })
@@ -1168,24 +1162,186 @@ func waitForMigrateState(t testing.TB, client 
*redis.Client, slot int, state Slo
        waitForMigrateStateInDuration(t, client, slot, state, 5*time.Second)
 }
 
+func waitForMigrateSlotRangeState(t testing.TB, client *redis.Client, 
slotRange string, state SlotMigrationState) {
+       waitForMigrateSlotRangeStateInDuration(t, client, slotRange, state, 
5*time.Second)
+}
+
 func waitForMigrateStateInDuration(t testing.TB, client *redis.Client, slot 
int, state SlotMigrationState, d time.Duration) {
        require.Eventually(t, func() bool {
                i := client.ClusterInfo(context.Background()).Val()
-               return strings.Contains(i, fmt.Sprintf("migrating_slot: %d", 
slot)) &&
+               return strings.Contains(i, fmt.Sprintf("migrating_slot(s): %d", 
slot)) &&
+                       strings.Contains(i, fmt.Sprintf("migrating_state: %s", 
state))
+       }, d, 100*time.Millisecond)
+}
+
+func waitForMigrateSlotRangeStateInDuration(t testing.TB, client 
*redis.Client, slotRange string, state SlotMigrationState, d time.Duration) {
+       slots := strings.Split(slotRange, "-")
+       if len(slots) == 2 && slots[0] == slots[1] {
+               slotRange = slots[0]
+       }
+       require.Eventually(t, func() bool {
+               i := client.ClusterInfo(context.Background()).Val()
+               return strings.Contains(i, fmt.Sprintf("migrating_slot(s): %s", 
slotRange)) &&
                        strings.Contains(i, fmt.Sprintf("migrating_state: %s", 
state))
        }, d, 100*time.Millisecond)
 }
 
 func requireMigrateState(t testing.TB, client *redis.Client, slot int, state 
SlotMigrationState) {
        i := client.ClusterInfo(context.Background()).Val()
-       require.Contains(t, i, fmt.Sprintf("migrating_slot: %d", slot))
+       require.Contains(t, i, fmt.Sprintf("migrating_slot(s): %d", slot))
+       require.Contains(t, i, fmt.Sprintf("migrating_state: %s", state))
+}
+
+func requireMigrateSlotRangeState(t testing.TB, client *redis.Client, 
slotRange string, state SlotMigrationState) {
+       i := client.ClusterInfo(context.Background()).Val()
+       require.Contains(t, i, fmt.Sprintf("migrating_slot(s): %s", slotRange))
        require.Contains(t, i, fmt.Sprintf("migrating_state: %s", state))
 }
 
 func waitForImportState(t testing.TB, client *redis.Client, n int, state 
SlotImportState) {
        require.Eventually(t, func() bool {
                i := client.ClusterInfo(context.Background()).Val()
-               return strings.Contains(i, fmt.Sprintf("importing_slot: %d", 
n)) &&
+               return strings.Contains(i, fmt.Sprintf("importing_slot(s): %d", 
n)) &&
                        strings.Contains(i, fmt.Sprintf("import_state: %s", 
state))
        }, 10*time.Second, 100*time.Millisecond)
 }
+func migrateSlotRangeAndSetSlot(t *testing.T, ctx context.Context, source 
*redis.Client, dest *redis.Client, destID string, slotRange string) {
+       require.Equal(t, "OK", source.Do(ctx, "clusterx", "migrate", slotRange, 
destID).Val())
+       waitForMigrateSlotRangeState(t, source, slotRange, 
SlotMigrationStateSuccess)
+       sourceVersion, _ := source.Do(ctx, "clusterx", "version").Int()
+       destVersion, _ := dest.Do(ctx, "clusterx", "version").Int()
+       require.NoError(t, source.Do(ctx, "clusterx", "setslot", slotRange, 
"node", destID, sourceVersion+1).Err())
+       require.NoError(t, dest.Do(ctx, "clusterx", "setslot", slotRange, 
"node", destID, destVersion+1).Err())
+}
+
+func TestSlotRangeMigrate(t *testing.T) {
+       ctx := context.Background()
+
+       srv0 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+       rdb0 := srv0.NewClient()
+       defer func() { require.NoError(t, rdb0.Close()) }()
+       defer func() { srv0.Close() }()
+       id0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODEID", id0).Err())
+
+       srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+       srv1Alive := true
+       defer func() {
+               if srv1Alive {
+                       srv1.Close()
+               }
+       }()
+
+       rdb1 := srv1.NewClient()
+       defer func() { require.NoError(t, rdb1.Close()) }()
+       id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())
+
+       clusterNodes := fmt.Sprintf("%s %s %d master - 0-10000\n", id0, 
srv0.Host(), srv0.Port())
+       clusterNodes += fmt.Sprintf("%s %s %d master - 10001-16383", id1, 
srv1.Host(), srv1.Port())
+       require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, 
"1").Err())
+       require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, 
"1").Err())
+
+       for slot := 0; slot < 500; slot++ {
+               for i := 0; i < 10; i++ {
+                       require.NoError(t, rdb0.LPush(ctx, 
util.SlotTable[slot], i).Err())
+               }
+       }
+
+       t.Run("MIGRATE - Slot range migration basic cases", func(t *testing.T) {
+               migrateSlotRangeAndSetSlot(t, ctx, rdb0, rdb1, id1, "0-9")
+               nodes := rdb0.ClusterNodes(ctx).Val()
+               require.Contains(t, nodes, "10-10000", "0-9 10001-16383")
+       })
+
+       t.Run("MIGRATE - Special slot range cases", func(t *testing.T) {
+               for i := 0; i < 10; i++ {
+                       require.NoError(t, rdb1.LPush(ctx, 
util.SlotTable[16383], i).Err())
+               }
+
+               migrateSlotRangeAndSetSlot(t, ctx, rdb0, rdb1, id1, "10")
+               time.Sleep(1 * time.Second)
+               migrateSlotRangeAndSetSlot(t, ctx, rdb0, rdb1, id1, "11-11")
+               time.Sleep(1 * time.Second)
+               migrateSlotRangeAndSetSlot(t, ctx, rdb1, rdb0, id0, 
"16383-16383")
+               time.Sleep(1 * time.Second)
+               nodes := rdb0.ClusterNodes(ctx).Val()
+               require.Contains(t, nodes, "12-10000 16383", "0-11 10001-16382")
+
+               errMsg := "Invalid slot range"
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"-1", id1).Err(), errMsg)
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"2-1", id1).Err(), errMsg)
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"-1-3", id1).Err(), errMsg)
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"-1--1", id1).Err(), errMsg)
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"3--1", id1).Err(), errMsg)
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"4-16384", id1).Err(), errMsg)
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"16384-16384", id1).Err(), errMsg)
+
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"16384", id1).Err(), "Invalid slot id: out of numeric range")
+       })
+
+       t.Run("MIGRATE - Repeat migration cases", func(t *testing.T) {
+               // non-overlapping
+               migrateSlotRangeAndSetSlot(t, ctx, rdb0, rdb1, id1, "104-106")
+               time.Sleep(1 * time.Second)
+               migrateSlotRangeAndSetSlot(t, ctx, rdb0, rdb1, id1, "107-108")
+               time.Sleep(1 * time.Second)
+               migrateSlotRangeAndSetSlot(t, ctx, rdb0, rdb1, id1, "102-103")
+               time.Sleep(1 * time.Second)
+               nodes := rdb0.ClusterNodes(ctx).Val()
+               require.Contains(t, nodes, "12-101 109-10000 16383", "0-11 
102-108 10001-16382")
+
+               // overlap
+               errMsg := "Can't migrate slot which doesn't belong to me"
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"100-102", id1).Err(), errMsg)
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"100-104", id1).Err(), errMsg)
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"108-109", id1).Err(), errMsg)
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"106-109", id1).Err(), errMsg)
+
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"102-108", id1).Err(), errMsg)
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"101-109", id1).Err(), errMsg)
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"105", id1).Err(), errMsg)
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"105-105", id1).Err(), errMsg)
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"104-106", id1).Err(), errMsg)
+       })
+
+       t.Run("MIGRATE - Repeat migration cases, but does not immediately 
update the topology via setslot", func(t *testing.T) {
+               // non-overlapping
+               require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", 
"114-116", id1).Val())
+               waitForMigrateSlotRangeState(t, rdb0, "114-116", 
SlotMigrationStateSuccess)
+               require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", 
"117-118", id1).Val())
+               waitForMigrateSlotRangeState(t, rdb0, "117-118", 
SlotMigrationStateSuccess)
+               require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", 
"112-113", id1).Val())
+               waitForMigrateSlotRangeState(t, rdb0, "112-113", 
SlotMigrationStateSuccess)
+               for slot := 112; slot <= 118; slot++ {
+                       require.Contains(t, rdb0.LPush(ctx, 
util.SlotTable[slot], 10).Err(), "MOVED")
+               }
+
+               // overlap
+               errMsg := "Can't migrate slot which has been migrated"
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"114-116", id1).Err(), errMsg)
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"117-118", id1).Err(), errMsg)
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"112", id1).Err(), errMsg)
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"112-112", id1).Err(), errMsg)
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"113", id1).Err(), errMsg)
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"113-113", id1).Err(), errMsg)
+
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"112-113", id1).Err(), errMsg)
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"112-120", id1).Err(), errMsg)
+               require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 
"110-112", id1).Err(), errMsg)
+       })
+
+       t.Run("MIGRATE - Failure cases", func(t *testing.T) {
+               largeSlot := 210
+               for i := 0; i < 20000; i++ {
+                       require.NoError(t, rdb0.LPush(ctx, 
util.SlotTable[largeSlot], i).Err())
+               }
+               require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", 
"200-220", id1).Val())
+               requireMigrateSlotRangeState(t, rdb0, "200-220", 
SlotMigrationStateStarted)
+               srv1Alive = false
+               srv1.Close()
+               time.Sleep(time.Second)
+               // TODO: More precise migration failure slot range
+               waitForMigrateSlotRangeState(t, rdb0, "200-220", 
SlotMigrationStateFailed)
+       })
+}
diff --git a/tests/gocase/unit/geo/geo_test.go 
b/tests/gocase/unit/geo/geo_test.go
index ebbbcd32..a920ddab 100644
--- a/tests/gocase/unit/geo/geo_test.go
+++ b/tests/gocase/unit/geo/geo_test.go
@@ -38,9 +38,10 @@ func geoDegrad(deg float64) float64 {
        return deg * math.Atan(1) * 8 / 360
 }
 
-func geoRandomPoint() (float64, float64) {
-       lon := (-180 + rand.Float64()*360)
-       lat := (-70 + rand.Float64()*140)
+func geoRandomPointWithSeed(seed int64) (float64, float64) {
+       r := rand.New(rand.NewSource(seed))
+       lon := (-180 + r.Float64()*360)
+       lat := (-70 + r.Float64()*140)
        return lon, lat
 }
 
@@ -434,26 +435,26 @@ var testGeo = func(t *testing.T, enabledRESP3 string) {
        t.Run("GEOADD + GEORANGE randomized test", func(t *testing.T) {
                for attempt := 0; attempt < 30; attempt++ {
                        var debuginfo string
+                       var seed int64
                        if attempt < len(regressionVectors) {
-                               rand.Seed(regressionVectors[attempt].seed)
-                               debuginfo += "rand seed is " + 
strconv.FormatInt(regressionVectors[attempt].seed, 10)
+                               seed = regressionVectors[attempt].seed
                        } else {
-                               tmp := time.Now().UnixNano()
-                               rand.Seed(tmp)
-                               debuginfo += "rand seed is " + 
strconv.FormatInt(tmp, 10)
+                               seed = time.Now().UnixNano()
                        }
+                       debuginfo += "rand seed is " + strconv.FormatInt(seed, 
10)
+
                        require.NoError(t, rdb.Del(ctx, "mypoints").Err())
                        var radiusKm int64
-                       if util.RandomInt(10) == 0 {
-                               radiusKm = util.RandomInt(50000) + 10
+                       if util.RandomIntWithSeed(10, seed) == 0 {
+                               radiusKm = util.RandomIntWithSeed(50000, seed) 
+ 10
                        } else {
-                               radiusKm = util.RandomInt(200) + 10
+                               radiusKm = util.RandomIntWithSeed(200, seed) + 
10
                        }
                        if attempt < len(regressionVectors) {
                                radiusKm = regressionVectors[attempt].km
                        }
                        radiusM := radiusKm * 1000
-                       searchLon, searchLat := geoRandomPoint()
+                       searchLon, searchLat := geoRandomPointWithSeed(seed)
                        if attempt < len(regressionVectors) {
                                searchLon = regressionVectors[attempt].lon
                                searchLat = regressionVectors[attempt].lat
@@ -462,7 +463,7 @@ var testGeo = func(t *testing.T, enabledRESP3 string) {
                        var result []string
                        var argvs []*redis.GeoLocation
                        for j := 0; j < 20000; j++ {
-                               lon, lat := geoRandomPoint()
+                               lon, lat := geoRandomPointWithSeed(seed)
                                argvs = append(argvs, 
&redis.GeoLocation{Longitude: lon, Latitude: lat, Name: "place:" + 
strconv.Itoa(j)})
                                distance := geoDistance(lon, lat, searchLon, 
searchLat)
                                if distance < float64(radiusM) {
diff --git a/tests/gocase/unit/type/list/list_test.go 
b/tests/gocase/unit/type/list/list_test.go
index e7f744e0..c440ebab 100644
--- a/tests/gocase/unit/type/list/list_test.go
+++ b/tests/gocase/unit/type/list/list_test.go
@@ -53,7 +53,6 @@ func TestLTRIM(t *testing.T) {
        key := "myList"
        startLen := int64(32)
 
-       rand.Seed(0)
        for typ, value := range largeValue {
                t.Run(fmt.Sprintf("LTRIM stress testing - %s", typ), func(t 
*testing.T) {
                        var myList []string
@@ -99,8 +98,6 @@ func TestZipList(t *testing.T) {
        })
        defer func() { require.NoError(t, rdb.Close()) }()
 
-       rand.Seed(0)
-
        t.Run("Explicit regression for a list bug", func(t *testing.T) {
                key := "l"
                myList := []string{
diff --git a/tests/gocase/unit/type/zset/zset_test.go 
b/tests/gocase/unit/type/zset/zset_test.go
index 04807e4d..e3d60912 100644
--- a/tests/gocase/unit/type/zset/zset_test.go
+++ b/tests/gocase/unit/type/zset/zset_test.go
@@ -612,7 +612,7 @@ func basicTests(t *testing.T, rdb *redis.Client, ctx 
context.Context, enabledRES
                for i := 0; i < 20; i++ {
                        var args [3]int64
                        for j := 0; j < 3; j++ {
-                               rand.Seed(time.Now().UnixNano())
+                               rand := 
rand.New(rand.NewSource(time.Now().UnixNano()))
                                args[j] = rand.Int63n(20) - 10
                        }
                        if args[2] == 0 {
diff --git a/tests/gocase/util/random.go b/tests/gocase/util/random.go
index 15acc4b8..f0a7c020 100644
--- a/tests/gocase/util/random.go
+++ b/tests/gocase/util/random.go
@@ -45,6 +45,11 @@ func RandomInt(max int64) int64 {
        return rand.Int63() % max
 }
 
+func RandomIntWithSeed(max, seed int64) int64 {
+       r := rand.New(rand.NewSource(seed))
+       return r.Int63() % max
+}
+
 func RandomBool() bool {
        return RandomInt(2) != 0
 }
@@ -114,3 +119,38 @@ func RandomValue() string {
                },
        )
 }
+
+func RandomValueWithSeed(seed int64) string {
+       return RandPath(
+               // Small enough to likely collide
+               func() string {
+                       return fmt.Sprintf("%d", RandomSignedInt(1000))
+               },
+               // 32 bit compressible signed/unsigned
+               func() string {
+                       return RandPath(
+                               func() string {
+                                       return fmt.Sprintf("%d", 
rand.Int63n(2000000000))
+                               },
+                               func() string {
+                                       return fmt.Sprintf("%d", 
rand.Int63n(4000000000))
+                               },
+                       )
+               },
+               // 64 bit
+               func() string {
+                       return fmt.Sprintf("%d", rand.Int63n(1000000000000))
+               },
+               // Random string
+               func() string {
+                       return RandPath(
+                               func() string {
+                                       return RandString(0, 256, Alpha)
+                               },
+                               func() string {
+                                       return RandString(0, 256, Binary)
+                               },
+                       )
+               },
+       )
+}

Reply via email to