git-hulk commented on code in PR #3240:
URL: https://github.com/apache/kvrocks/pull/3240#discussion_r2490342741


##########
src/cluster/slot_migrate.cc:
##########
@@ -1260,8 +1263,51 @@ Status SlotMigrator::sendSnapshotByRawKV() {
   auto slot_range = slot_range_.load();
   info("[migrate] Migrating snapshot of slot(s) {} by raw key value", 
slot_range.String());
 
-  auto prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
-  auto upper_bound = ComposeSlotKeyUpperBound(namespace_, slot_range.end);
+  int total_slots = slot_range.end - slot_range.start + 1;
+  unsigned int max_parallelism = std::thread::hardware_concurrency();
+  unsigned int available_slots = static_cast<unsigned int>(total_slots);
+  int parallelism = static_cast<int>(std::min(max_parallelism, 
available_slots));
+  int base = total_slots / parallelism;
+  int rem = total_slots % parallelism;
+
+  std::vector<std::future<Status>> results;
+  int cur_start = slot_range.start;
+  for (int i = 0; i < parallelism; i++) {
+    int count = base + (i < rem ? 1 : 0);
+    int cur_end = cur_start + count - 1;
+    if (count <= 0) break;
+
+    results.emplace_back(std::async(std::launch::async, [=]() -> Status {
+      int fd = createConnectToDstNode();
+      if (fd < 0) {
+        return Status(Status::NotOK, fmt::format("thread[{}] 
createConnectToDstNode failed", i));
+      }
+      auto s = migrateSlotRange(cur_start, cur_end, parallelism, fd);
+      close(fd);
+      return s;
+    }));
+
+    cur_start = cur_end + 1;
+  }
+
+  // Wait til finish
+  for (auto &result : results) {
+    auto s = result.get();

Review Comment:
   @hll1213181368 Seems this comment was resolved? Did I miss anything?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to