Copilot commented on code in PR #3299:
URL: https://github.com/apache/kvrocks/pull/3299#discussion_r2634152725
##########
src/cluster/slot_migrate.cc:
##########
@@ -1325,20 +1368,32 @@ Status SlotMigrator::sendSnapshotByRawKV() {
GET_OR_RET(sendMigrationBatch(&batch_sender));
- auto elapsed = util::GetTimeStampMS() - start_ts;
- info(
- "[migrate] Succeed to migrate snapshot range, slot(s): {}, elapsed: {}
ms, sent: {} bytes, rate: {:.2f} kb/s, "
- "batches: {}, entries: {}",
- slot_range.String(), elapsed, batch_sender.GetSentBytes(),
batch_sender.GetRate(start_ts),
- batch_sender.GetSentBatchesNum(), batch_sender.GetEntriesNum());
-
return Status::OK();
}
+int SlotMigrator::createConnectToDstNode() {
+ // Connect to the destination node
+ auto fd = util::SockConnect(dst_ip_, dst_port_);
+ if (!fd.IsOK()) {
+ error("failed to connect to the node error: {}", fd.Msg());
+ return -1;
+ }
+
+ std::string pass = srv_->GetConfig()->requirepass;
+ if (!pass.empty()) {
+ auto s = authOnDstNode(*fd, pass);
+ if (!s.IsOK()) {
+ error("failed to authenticate on destination node error: {}", s.Msg());
+ return -1;
Review Comment:
The error handling returns -1 but doesn't close the file descriptor that was
successfully created by SockConnect. When authentication fails, the established
connection is leaked. The file descriptor from SockConnect should be closed
before returning on authentication failure.
##########
kvrocks.conf:
##########
@@ -723,13 +723,17 @@ migrate-batch-size-kb 16
# Default: 16M
migrate-batch-rate-limit-mb 16
-
# If it is set to yes, kvrocks will skip the deallocation of block cache
# while closing the database to speed up the shutdown
#
# Default: no
# skip-block-cache-deallocation-on-close no
+# The parallelism of slot migration passing SST files
+#
+# Default: the number of Kvrocks node cores
Review Comment:
The comment is incomplete - it should be "Default: 0 (which uses the number
of Kvrocks node cores)" to clarify that 0 is a valid value that triggers
automatic detection, not just stating what the automatic value would be.
```suggestion
# Default: 0 (which uses the number of Kvrocks node cores)
```
##########
src/cluster/slot_migrate.cc:
##########
@@ -69,6 +73,7 @@ SlotMigrator::SlotMigrator(Server *srv)
// [Note]:
// This problem may exist in all functions of Database called in slot
migration process.
metadata_cf_handle_ = nullptr;
+
global_rate_limiter_.reset(rocksdb::NewGenericRateLimiter(static_cast<int64_t>(migrate_batch_bytes_per_sec_)));
Review Comment:
The global_rate_limiter_ is shared across multiple threads without any
synchronization mechanism. While RocksDB's RateLimiter is designed to be
thread-safe, the shared_ptr itself needs to be properly initialized before
being accessed by multiple threads. Currently, it's initialized in the
constructor but then accessed in parallel threads in migrateSlotRange. Ensure
that the rate limiter is fully initialized before any parallel migration begins.
##########
src/cluster/slot_migrate.h:
##########
@@ -173,6 +179,9 @@ class SlotMigrator : public redis::Database {
uint64_t seq_gap_limit_ = kDefaultSequenceGapLimit;
std::atomic<size_t> migrate_batch_bytes_per_sec_ = 1 * GiB;
std::atomic<size_t> migrate_batch_size_bytes_;
+ int migrate_slots_send_snapshots_parallelism_ = 0;
Review Comment:
The member variable is initialized to 0, which would be treated as an
invalid value by SetMigrateSlotsSendSnapshotsParallelism. However, the
configuration callback sets it to hardware_concurrency() when 0 is detected.
This creates a potential race condition or inconsistent state if the setter is
called before the configuration is fully initialized. Consider initializing
this to a sensible default value (e.g., 1 or
std::thread::hardware_concurrency()) in the constructor instead of relying on
the callback.
```suggestion
int migrate_slots_send_snapshots_parallelism_ =
std::thread::hardware_concurrency() == 0
? 1
: static_cast<int>(std::thread::hardware_concurrency());
```
##########
src/cluster/slot_migrate.h:
##########
@@ -99,6 +100,9 @@ class SlotMigrator : public redis::Database {
void SetSequenceGapLimit(int value) {
if (value > 0) seq_gap_limit_ = value;
}
+ void SetMigrateSlotsSendSnapshotsParallelism(int value) {
+ if (value > 0) migrate_slots_send_snapshots_parallelism_ = value;
+ }
Review Comment:
The validation only checks if the value is greater than 0, but the
configuration allows 0 as a valid value (minimum is 0 in IntField). This
inconsistency means if a user explicitly sets the value to 0, it will be
accepted in configuration but ignored here. Consider aligning the validation
with the configuration constraints or handling 0 explicitly as a special case
in the setter.
##########
src/cluster/slot_migrate.cc:
##########
@@ -1325,20 +1368,32 @@ Status SlotMigrator::sendSnapshotByRawKV() {
GET_OR_RET(sendMigrationBatch(&batch_sender));
- auto elapsed = util::GetTimeStampMS() - start_ts;
- info(
- "[migrate] Succeed to migrate snapshot range, slot(s): {}, elapsed: {}
ms, sent: {} bytes, rate: {:.2f} kb/s, "
- "batches: {}, entries: {}",
- slot_range.String(), elapsed, batch_sender.GetSentBytes(),
batch_sender.GetRate(start_ts),
- batch_sender.GetSentBatchesNum(), batch_sender.GetEntriesNum());
-
return Status::OK();
Review Comment:
The logging information about migration progress (bytes sent, rate, batches,
entries) has been removed from the individual thread migrations. This makes
debugging and monitoring parallel migrations more difficult, as there's no
per-thread visibility. Consider adding aggregate logging or at least
debug-level logs for each thread's progress to aid troubleshooting.
##########
src/config/config.cc:
##########
@@ -610,6 +612,16 @@ void Config::initFieldCallback() {
srv->slot_migrator->SetMigrateBatchSize(migrate_batch_size_kb *
KiB);
return Status::OK();
}},
+ {"migrate-slots-send-snapshots-parallelism",
+ [this](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
+ if (migrate_slots_send_snapshots_parallelism == 0) {
+ unsigned int max_parallelism =
std::thread::hardware_concurrency();
+ migrate_slots_send_snapshots_parallelism =
static_cast<int>(max_parallelism);
+ }
+ if (!srv) return Status::OK();
+
srv->slot_migrator->SetMigrateSlotsSendSnapshotsParallelism(migrate_slots_send_snapshots_parallelism);
Review Comment:
The callback modifies the configuration value
(migrate_slots_send_snapshots_parallelism) directly when it's set to 0, but
this modified value won't be persisted to the configuration file. This could
lead to inconsistency between the runtime value and the saved configuration.
Consider whether the default should be set during initialization instead, or if
the behavior should be documented more clearly.
```suggestion
int effective_parallelism =
migrate_slots_send_snapshots_parallelism;
if (effective_parallelism == 0) {
unsigned int max_parallelism =
std::thread::hardware_concurrency();
effective_parallelism = static_cast<int>(max_parallelism);
}
if (!srv) return Status::OK();
srv->slot_migrator->SetMigrateSlotsSendSnapshotsParallelism(effective_parallelism);
```
##########
src/cluster/slot_migrate.cc:
##########
@@ -1251,7 +1256,6 @@ void SlotMigrator::resumeSyncCtx(const Status
&migrate_result) {
Status SlotMigrator::sendMigrationBatch(BatchSender *batch) {
// user may dynamically change some configs, apply it when send data
batch->SetMaxBytes(migrate_batch_size_bytes_);
Review Comment:
The removal of the SetBytesPerSecond call means that dynamic changes to
migrate_batch_bytes_per_sec_ during migration will not be applied to the global
rate limiter. Since the rate limiter is now created once in the constructor and
shared globally, updates to the config value won't take effect until the
SlotMigrator is recreated. Consider whether dynamic rate limit updates should
still be supported and if so, add thread-safe updating of the global rate
limiter.
```suggestion
batch->SetMaxBytes(migrate_batch_size_bytes_);
batch->SetBytesPerSecond(migrate_batch_bytes_per_sec_);
```
##########
src/cluster/slot_migrate.cc:
##########
@@ -1260,8 +1264,48 @@ Status SlotMigrator::sendSnapshotByRawKV() {
auto slot_range = slot_range_.load();
info("[migrate] Migrating snapshot of slot(s) {} by raw key value",
slot_range.String());
- auto prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
- auto upper_bound = ComposeSlotKeyUpperBound(namespace_, slot_range.end);
+ int total_slots = slot_range.end - slot_range.start + 1;
+ int parallelism = std::min(migrate_slots_send_snapshots_parallelism_,
total_slots);
Review Comment:
If parallelism equals 0 (when migrate_slots_send_snapshots_parallelism_ is
not properly initialized), std::min will return 0, and no threads will be
created. This would result in silent failure to migrate any data. Add explicit
validation to ensure parallelism is at least 1 before proceeding with the
migration.
```suggestion
int parallelism = std::min(migrate_slots_send_snapshots_parallelism_,
total_slots);
if (parallelism < 1) {
parallelism = 1;
}
```
##########
src/cluster/slot_migrate.cc:
##########
@@ -1260,8 +1264,48 @@ Status SlotMigrator::sendSnapshotByRawKV() {
auto slot_range = slot_range_.load();
info("[migrate] Migrating snapshot of slot(s) {} by raw key value",
slot_range.String());
- auto prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
- auto upper_bound = ComposeSlotKeyUpperBound(namespace_, slot_range.end);
+ int total_slots = slot_range.end - slot_range.start + 1;
+ int parallelism = std::min(migrate_slots_send_snapshots_parallelism_,
total_slots);
+ int slots_per_thread = total_slots / parallelism;
+ int remain_slots = total_slots % parallelism;
+
+ std::vector<std::future<Status>> results;
+ int cur_start = slot_range.start;
+ for (int i = 0; i < parallelism; i++) {
+ int count = slots_per_thread + (i < remain_slots ? 1 : 0);
+ int cur_end = cur_start + count - 1;
+
+ results.emplace_back(std::async(std::launch::async, [=]() -> Status {
+ int fd = createConnectToDstNode();
+ if (fd < 0) {
+ return {Status::NotOK, fmt::format("failed to connect the destination
node in thread[{}]", i)};
+ }
+ auto s = migrateSlotRange(cur_start, cur_end, fd);
+ close(fd);
+ return s;
+ }));
+
+ cur_start = cur_end + 1;
+ }
+
+ // Wait til finish
+ for (auto &result : results) {
+ auto s = result.get();
+ if (!s.IsOK()) {
+ return {Status::NotOK, fmt::format("[migrate] Parallel migrate get
result error: {}", s.Msg())};
+ }
+ }
+
+ auto elapsed = util::GetTimeStampMS() - start_ts;
+ info("[migrate] Parallel snapshot migrate succeeded, slot(s) {}, elapsed: {}
ms", slot_range.String(), elapsed);
+
+ return Status::OK();
+}
+
+Status SlotMigrator::migrateSlotRange(int start_slot, int end_slot, int fd) {
+ SlotRange sub{start_slot, end_slot};
+ auto prefix = ComposeSlotKeyPrefix(namespace_, start_slot);
+ auto upper_bound = ComposeSlotKeyUpperBound(namespace_, end_slot);
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = slot_snapshot_;
Review Comment:
The slot_snapshot_ member variable is accessed by multiple threads without
synchronization. While RocksDB snapshots are immutable and thread-safe to read
from, the pointer itself should be properly synchronized or documented as being
set before parallel access begins. Verify that slot_snapshot_ is fully
initialized and won't change during the parallel migration phase.
##########
src/cluster/slot_migrate.cc:
##########
@@ -1260,8 +1264,48 @@ Status SlotMigrator::sendSnapshotByRawKV() {
auto slot_range = slot_range_.load();
info("[migrate] Migrating snapshot of slot(s) {} by raw key value",
slot_range.String());
- auto prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
- auto upper_bound = ComposeSlotKeyUpperBound(namespace_, slot_range.end);
+ int total_slots = slot_range.end - slot_range.start + 1;
+ int parallelism = std::min(migrate_slots_send_snapshots_parallelism_,
total_slots);
+ int slots_per_thread = total_slots / parallelism;
+ int remain_slots = total_slots % parallelism;
+
+ std::vector<std::future<Status>> results;
+ int cur_start = slot_range.start;
+ for (int i = 0; i < parallelism; i++) {
+ int count = slots_per_thread + (i < remain_slots ? 1 : 0);
+ int cur_end = cur_start + count - 1;
+
+ results.emplace_back(std::async(std::launch::async, [=]() -> Status {
+ int fd = createConnectToDstNode();
+ if (fd < 0) {
+ return {Status::NotOK, fmt::format("failed to connect the destination
node in thread[{}]", i)};
+ }
+ auto s = migrateSlotRange(cur_start, cur_end, fd);
+ close(fd);
+ return s;
+ }));
+
+ cur_start = cur_end + 1;
+ }
+
+ // Wait til finish
+ for (auto &result : results) {
+ auto s = result.get();
+ if (!s.IsOK()) {
+ return {Status::NotOK, fmt::format("[migrate] Parallel migrate get
result error: {}", s.Msg())};
+ }
+ }
+
+ auto elapsed = util::GetTimeStampMS() - start_ts;
+ info("[migrate] Parallel snapshot migrate succeeded, slot(s) {}, elapsed: {}
ms", slot_range.String(), elapsed);
+
+ return Status::OK();
+}
+
+Status SlotMigrator::migrateSlotRange(int start_slot, int end_slot, int fd) {
+ SlotRange sub{start_slot, end_slot};
+ auto prefix = ComposeSlotKeyPrefix(namespace_, start_slot);
+ auto upper_bound = ComposeSlotKeyUpperBound(namespace_, end_slot);
Review Comment:
The namespace_ member variable is accessed by multiple threads in parallel
without synchronization. While it's likely set before migration begins and not
modified during migration, this should be verified. Consider documenting
thread-safety assumptions for member variables accessed in parallel contexts.
##########
src/cluster/slot_migrate.cc:
##########
@@ -1325,20 +1368,32 @@ Status SlotMigrator::sendSnapshotByRawKV() {
GET_OR_RET(sendMigrationBatch(&batch_sender));
- auto elapsed = util::GetTimeStampMS() - start_ts;
- info(
- "[migrate] Succeed to migrate snapshot range, slot(s): {}, elapsed: {}
ms, sent: {} bytes, rate: {:.2f} kb/s, "
- "batches: {}, entries: {}",
- slot_range.String(), elapsed, batch_sender.GetSentBytes(),
batch_sender.GetRate(start_ts),
- batch_sender.GetSentBatchesNum(), batch_sender.GetEntriesNum());
-
return Status::OK();
}
Review Comment:
The dst_ip_ and dst_port_ member variables are accessed by multiple threads
without synchronization. While these are set before the parallel migration
begins and are not modified during migration, they should be documented as
thread-safe or protected. Consider making them const or adding documentation
that they must not be modified during parallel operations.
```suggestion
// NOTE: dst_ip_ and dst_port_ are configured before any parallel migration
begins
// and are not modified during migration. They must not be mutated while
parallel
// operations are in progress, so concurrent reads from multiple threads
here are
// considered thread-safe by design.
```
##########
src/cluster/slot_migrate.cc:
##########
@@ -1260,8 +1264,48 @@ Status SlotMigrator::sendSnapshotByRawKV() {
auto slot_range = slot_range_.load();
info("[migrate] Migrating snapshot of slot(s) {} by raw key value",
slot_range.String());
- auto prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
- auto upper_bound = ComposeSlotKeyUpperBound(namespace_, slot_range.end);
+ int total_slots = slot_range.end - slot_range.start + 1;
+ int parallelism = std::min(migrate_slots_send_snapshots_parallelism_,
total_slots);
+ int slots_per_thread = total_slots / parallelism;
+ int remain_slots = total_slots % parallelism;
+
+ std::vector<std::future<Status>> results;
+ int cur_start = slot_range.start;
+ for (int i = 0; i < parallelism; i++) {
+ int count = slots_per_thread + (i < remain_slots ? 1 : 0);
+ int cur_end = cur_start + count - 1;
+
+ results.emplace_back(std::async(std::launch::async, [=]() -> Status {
Review Comment:
The lambda captures variables by value (using [=]), including the loop
variable 'i'. However, 'i' is only used in the error message on line 1281,
which makes it valuable. The other captured variables (cur_start, cur_end) are
correctly captured by value since they change in each iteration. This is
correct, but consider being explicit about what's captured for better code
clarity.
##########
src/cluster/slot_migrate.cc:
##########
@@ -1260,8 +1264,48 @@ Status SlotMigrator::sendSnapshotByRawKV() {
auto slot_range = slot_range_.load();
info("[migrate] Migrating snapshot of slot(s) {} by raw key value",
slot_range.String());
- auto prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
- auto upper_bound = ComposeSlotKeyUpperBound(namespace_, slot_range.end);
+ int total_slots = slot_range.end - slot_range.start + 1;
+ int parallelism = std::min(migrate_slots_send_snapshots_parallelism_,
total_slots);
+ int slots_per_thread = total_slots / parallelism;
+ int remain_slots = total_slots % parallelism;
+
+ std::vector<std::future<Status>> results;
+ int cur_start = slot_range.start;
+ for (int i = 0; i < parallelism; i++) {
+ int count = slots_per_thread + (i < remain_slots ? 1 : 0);
+ int cur_end = cur_start + count - 1;
+
+ results.emplace_back(std::async(std::launch::async, [=]() -> Status {
+ int fd = createConnectToDstNode();
+ if (fd < 0) {
+ return {Status::NotOK, fmt::format("failed to connect the destination
node in thread[{}]", i)};
+ }
+ auto s = migrateSlotRange(cur_start, cur_end, fd);
+ close(fd);
+ return s;
+ }));
+
+ cur_start = cur_end + 1;
+ }
+
+ // Wait til finish
+ for (auto &result : results) {
+ auto s = result.get();
+ if (!s.IsOK()) {
+ return {Status::NotOK, fmt::format("[migrate] Parallel migrate get
result error: {}", s.Msg())};
+ }
+ }
+
Review Comment:
When any thread fails during parallel migration, the function returns
immediately without waiting for other threads to complete. This could leave
running threads that continue to execute and potentially access shared
resources after the migration has been marked as failed. Consider implementing
proper cleanup or cancellation of remaining threads when one fails, or at
minimum wait for all threads to finish before returning the error.
```suggestion
Status first_error;
bool has_error = false;
for (auto &result : results) {
auto s = result.get();
if (!s.IsOK() && !has_error) {
first_error = s;
has_error = true;
}
}
if (has_error) {
return {Status::NotOK,
fmt::format("[migrate] Parallel migrate get result error: {}",
first_error.Msg())};
}
```
##########
src/cluster/slot_migrate.cc:
##########
@@ -1325,20 +1368,32 @@ Status SlotMigrator::sendSnapshotByRawKV() {
GET_OR_RET(sendMigrationBatch(&batch_sender));
- auto elapsed = util::GetTimeStampMS() - start_ts;
- info(
- "[migrate] Succeed to migrate snapshot range, slot(s): {}, elapsed: {}
ms, sent: {} bytes, rate: {:.2f} kb/s, "
- "batches: {}, entries: {}",
- slot_range.String(), elapsed, batch_sender.GetSentBytes(),
batch_sender.GetRate(start_ts),
- batch_sender.GetSentBatchesNum(), batch_sender.GetEntriesNum());
-
return Status::OK();
}
+int SlotMigrator::createConnectToDstNode() {
+ // Connect to the destination node
+ auto fd = util::SockConnect(dst_ip_, dst_port_);
+ if (!fd.IsOK()) {
+ error("failed to connect to the node error: {}", fd.Msg());
+ return -1;
+ }
+
+ std::string pass = srv_->GetConfig()->requirepass;
+ if (!pass.empty()) {
+ auto s = authOnDstNode(*fd, pass);
+ if (!s.IsOK()) {
+ error("failed to authenticate on destination node error: {}", s.Msg());
+ return -1;
+ }
+ }
+ return *fd;
Review Comment:
The function signature returns an int that can be negative to indicate
failure, but the return type should more clearly express this. Consider using a
StatusOr or Result type pattern instead of returning a raw int where negative
values mean error. This would make the API more consistent with the rest of the
codebase which uses Status objects.
```suggestion
Status SlotMigrator::createConnectToDstNode(int *out_fd) {
// Connect to the destination node
auto fd = util::SockConnect(dst_ip_, dst_port_);
if (!fd.IsOK()) {
auto msg = fmt::format("failed to connect to the node error: {}",
fd.Msg());
error("{}", msg);
return {Status::NotOK, msg};
}
std::string pass = srv_->GetConfig()->requirepass;
if (!pass.empty()) {
auto s = authOnDstNode(*fd, pass);
if (!s.IsOK()) {
auto msg = fmt::format("failed to authenticate on destination node
error: {}", s.Msg());
error("{}", msg);
return {Status::NotOK, msg};
}
}
*out_fd = *fd;
return Status::OK();
```
##########
kvrocks.conf:
##########
@@ -723,13 +723,17 @@ migrate-batch-size-kb 16
# Default: 16M
migrate-batch-rate-limit-mb 16
-
# If it is set to yes, kvrocks will skip the deallocation of block cache
# while closing the database to speed up the shutdown
#
# Default: no
# skip-block-cache-deallocation-on-close no
+# The parallelism of slot migration passing SST files
Review Comment:
The comment states "passing SST files" but the actual implementation sends
snapshots by raw key-value pairs, not SST files. This is misleading. The
comment should accurately describe that this setting controls the parallelism
of sending snapshot data during slot migration.
```suggestion
# The parallelism of sending snapshot data (raw key-value pairs) during slot
migration
```
##########
src/cluster/slot_migrate.cc:
##########
@@ -1260,8 +1264,48 @@ Status SlotMigrator::sendSnapshotByRawKV() {
auto slot_range = slot_range_.load();
info("[migrate] Migrating snapshot of slot(s) {} by raw key value",
slot_range.String());
- auto prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
- auto upper_bound = ComposeSlotKeyUpperBound(namespace_, slot_range.end);
+ int total_slots = slot_range.end - slot_range.start + 1;
+ int parallelism = std::min(migrate_slots_send_snapshots_parallelism_,
total_slots);
+ int slots_per_thread = total_slots / parallelism;
+ int remain_slots = total_slots % parallelism;
+
+ std::vector<std::future<Status>> results;
+ int cur_start = slot_range.start;
+ for (int i = 0; i < parallelism; i++) {
+ int count = slots_per_thread + (i < remain_slots ? 1 : 0);
+ int cur_end = cur_start + count - 1;
+
+ results.emplace_back(std::async(std::launch::async, [=]() -> Status {
+ int fd = createConnectToDstNode();
+ if (fd < 0) {
+ return {Status::NotOK, fmt::format("failed to connect the destination
node in thread[{}]", i)};
+ }
Review Comment:
Each parallel thread creates its own connection to the destination node via
createConnectToDstNode(), but there's no mechanism to ensure these connections
don't overwhelm the destination node. Consider adding configuration or
documentation about the impact of parallel connections, or implementing
connection pooling/throttling to prevent resource exhaustion on the destination.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]