git-hulk commented on code in PR #3240:
URL: https://github.com/apache/kvrocks/pull/3240#discussion_r2484094630


##########
src/cluster/slot_migrate.cc:
##########
@@ -1260,8 +1263,51 @@ Status SlotMigrator::sendSnapshotByRawKV() {
   auto slot_range = slot_range_.load();
   info("[migrate] Migrating snapshot of slot(s) {} by raw key value", 
slot_range.String());
 
-  auto prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
-  auto upper_bound = ComposeSlotKeyUpperBound(namespace_, slot_range.end);
+  int total_slots = slot_range.end - slot_range.start + 1;
+  unsigned int max_parallelism = std::thread::hardware_concurrency();
+  unsigned int available_slots = static_cast<unsigned int>(total_slots);
+  int parallelism = static_cast<int>(std::min(max_parallelism, 
available_slots));
+  int base = total_slots / parallelism;
+  int rem = total_slots % parallelism;
+
+  std::vector<std::future<Status>> results;
+  int cur_start = slot_range.start;
+  for (int i = 0; i < parallelism; i++) {
+    int count = base + (i < rem ? 1 : 0);
+    int cur_end = cur_start + count - 1;
+    if (count <= 0) break;

Review Comment:
   When the count would be 0?



##########
src/cluster/slot_migrate.cc:
##########
@@ -1260,8 +1263,51 @@ Status SlotMigrator::sendSnapshotByRawKV() {
   auto slot_range = slot_range_.load();
   info("[migrate] Migrating snapshot of slot(s) {} by raw key value", 
slot_range.String());
 
-  auto prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
-  auto upper_bound = ComposeSlotKeyUpperBound(namespace_, slot_range.end);
+  int total_slots = slot_range.end - slot_range.start + 1;
+  unsigned int max_parallelism = std::thread::hardware_concurrency();
+  unsigned int available_slots = static_cast<unsigned int>(total_slots);
+  int parallelism = static_cast<int>(std::min(max_parallelism, 
available_slots));
+  int base = total_slots / parallelism;
+  int rem = total_slots % parallelism;

Review Comment:
   Those two should be `slots_per_thread` and `remain_slots?`



##########
src/cluster/slot_migrate.cc:
##########
@@ -1260,8 +1263,51 @@ Status SlotMigrator::sendSnapshotByRawKV() {
   auto slot_range = slot_range_.load();
   info("[migrate] Migrating snapshot of slot(s) {} by raw key value", 
slot_range.String());
 
-  auto prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
-  auto upper_bound = ComposeSlotKeyUpperBound(namespace_, slot_range.end);
+  int total_slots = slot_range.end - slot_range.start + 1;
+  unsigned int max_parallelism = std::thread::hardware_concurrency();
+  unsigned int available_slots = static_cast<unsigned int>(total_slots);
+  int parallelism = static_cast<int>(std::min(max_parallelism, 
available_slots));
+  int base = total_slots / parallelism;
+  int rem = total_slots % parallelism;
+
+  std::vector<std::future<Status>> results;
+  int cur_start = slot_range.start;
+  for (int i = 0; i < parallelism; i++) {
+    int count = base + (i < rem ? 1 : 0);
+    int cur_end = cur_start + count - 1;
+    if (count <= 0) break;
+
+    results.emplace_back(std::async(std::launch::async, [=]() -> Status {
+      int fd = createConnectToDstNode();
+      if (fd < 0) {
+        return Status(Status::NotOK, fmt::format("thread[{}] 
createConnectToDstNode failed", i));

Review Comment:
   ```suggestion
           return Status(Status::NotOK, fmt::format("failed to connect the 
destination node in thread[{}]", i));
   ```



##########
src/cluster/slot_migrate.cc:
##########
@@ -1325,16 +1370,44 @@ Status SlotMigrator::sendSnapshotByRawKV() {
 
   GET_OR_RET(sendMigrationBatch(&batch_sender));
 
-  auto elapsed = util::GetTimeStampMS() - start_ts;
-  info(
-      "[migrate] Succeed to migrate snapshot range, slot(s): {}, elapsed: {} 
ms, sent: {} bytes, rate: {:.2f} kb/s, "
-      "batches: {}, entries: {}",
-      slot_range.String(), elapsed, batch_sender.GetSentBytes(), 
batch_sender.GetRate(start_ts),
-      batch_sender.GetSentBatchesNum(), batch_sender.GetEntriesNum());
-
   return Status::OK();
 }
 
+int SlotMigrator::createConnectToDstNode() {

Review Comment:
   You can just `SockConnect` instead of creating another one.



##########
src/cluster/slot_migrate.cc:
##########
@@ -1260,8 +1263,51 @@ Status SlotMigrator::sendSnapshotByRawKV() {
   auto slot_range = slot_range_.load();
   info("[migrate] Migrating snapshot of slot(s) {} by raw key value", 
slot_range.String());
 
-  auto prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
-  auto upper_bound = ComposeSlotKeyUpperBound(namespace_, slot_range.end);
+  int total_slots = slot_range.end - slot_range.start + 1;
+  unsigned int max_parallelism = std::thread::hardware_concurrency();
+  unsigned int available_slots = static_cast<unsigned int>(total_slots);

Review Comment:
   I think this line is unnecessary; you can just cast max_parallelism to `int`.



##########
src/cluster/slot_migrate.cc:
##########
@@ -1260,8 +1263,51 @@ Status SlotMigrator::sendSnapshotByRawKV() {
   auto slot_range = slot_range_.load();
   info("[migrate] Migrating snapshot of slot(s) {} by raw key value", 
slot_range.String());
 
-  auto prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
-  auto upper_bound = ComposeSlotKeyUpperBound(namespace_, slot_range.end);
+  int total_slots = slot_range.end - slot_range.start + 1;
+  unsigned int max_parallelism = std::thread::hardware_concurrency();
+  unsigned int available_slots = static_cast<unsigned int>(total_slots);
+  int parallelism = static_cast<int>(std::min(max_parallelism, 
available_slots));
+  int base = total_slots / parallelism;
+  int rem = total_slots % parallelism;
+
+  std::vector<std::future<Status>> results;
+  int cur_start = slot_range.start;
+  for (int i = 0; i < parallelism; i++) {
+    int count = base + (i < rem ? 1 : 0);
+    int cur_end = cur_start + count - 1;
+    if (count <= 0) break;
+
+    results.emplace_back(std::async(std::launch::async, [=]() -> Status {
+      int fd = createConnectToDstNode();
+      if (fd < 0) {
+        return Status(Status::NotOK, fmt::format("thread[{}] 
createConnectToDstNode failed", i));
+      }
+      auto s = migrateSlotRange(cur_start, cur_end, parallelism, fd);
+      close(fd);
+      return s;
+    }));
+
+    cur_start = cur_end + 1;
+  }
+
+  // Wait til finish
+  for (auto &result : results) {
+    auto s = result.get();

Review Comment:
   What if one of the threads fails? Others are still migrating? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to