This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new bf56a05f chore: improve the style and document for SlotMigration
(#2465)
bf56a05f is described below
commit bf56a05f93f6406f67db3bed54114a3e64ffe351
Author: mwish <[email protected]>
AuthorDate: Mon Aug 5 16:34:31 2024 +0800
chore: improve the style and document for SlotMigration (#2465)
---
src/cluster/slot_migrate.cc | 49 +++++++++++++++++++++++++++++++--------------
src/cluster/slot_migrate.h | 29 +++++++++++++++++----------
src/stats/stats.h | 2 +-
3 files changed, 53 insertions(+), 27 deletions(-)
diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc
index f91c9a1a..cd7e9e42 100644
--- a/src/cluster/slot_migrate.cc
+++ b/src/cluster/slot_migrate.cc
@@ -34,10 +34,10 @@
#include "time_util.h"
#include "types/redis_stream_base.h"
-const char *errFailedToSendCommands = "failed to send commands to restore a
key";
-const char *errMigrationTaskCanceled = "key migration stopped due to a task
cancellation";
-const char *errFailedToSetImportStatus = "failed to set import status on
destination node";
-const char *errUnsupportedMigrationType = "unsupported migration type";
+constexpr std::string_view errFailedToSendCommands = "failed to send commands
to restore a key";
+constexpr std::string_view errMigrationTaskCanceled = "key migration stopped
due to a task cancellation";
+constexpr std::string_view errFailedToSetImportStatus = "failed to set import
status on destination node";
+constexpr std::string_view errUnsupportedMigrationType = "unsupported
migration type";
static std::map<RedisType, std::string> type_to_cmd = {
{kRedisString, "set"}, {kRedisList, "rpush"}, {kRedisHash, "hmset"},
{kRedisSet, "sadd"},
@@ -318,7 +318,7 @@ Status SlotMigrator::sendSnapshot() {
} else if (migration_type_ == MigrationType::kRawKeyValue) {
return sendSnapshotByRawKV();
}
- return {Status::NotOK, errUnsupportedMigrationType};
+ return {Status::NotOK, std::string(errUnsupportedMigrationType)};
}
Status SlotMigrator::syncWAL() {
@@ -327,7 +327,7 @@ Status SlotMigrator::syncWAL() {
} else if (migration_type_ == MigrationType::kRawKeyValue) {
return syncWALByRawKV();
}
- return {Status::NotOK, errUnsupportedMigrationType};
+ return {Status::NotOK, std::string(errUnsupportedMigrationType)};
}
Status SlotMigrator::sendSnapshotByCmd() {
@@ -337,7 +337,7 @@ Status SlotMigrator::sendSnapshotByCmd() {
std::string restore_cmds;
SlotRange slot_range = slot_range_;
- LOG(INFO) << "[migrate] Start migrating snapshot of slot(s)" <<
slot_range.String();
+ LOG(INFO) << "[migrate] Start migrating snapshot of slot(s): " <<
slot_range.String();
// Construct key prefix to iterate the keys belong to the target slot
std::string prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
@@ -351,12 +351,12 @@ Status SlotMigrator::sendSnapshotByCmd() {
auto iter =
util::UniqueIterator(storage_->GetDB()->NewIterator(read_options, cf_handle));
// Seek to the beginning of keys start with 'prefix' and iterate all these
keys
- auto current_slot = slot_range.start;
+ int current_slot = slot_range.start;
for (iter->Seek(prefix); iter->Valid(); iter->Next()) {
// The migrating task has to be stopped, if server role is changed from
master to slave
// or flush command (flushdb or flushall) is executed
if (stop_migration_) {
- return {Status::NotOK, errMigrationTaskCanceled};
+ return {Status::NotOK, std::string(errMigrationTaskCanceled)};
}
// Iteration is out of range
@@ -366,7 +366,7 @@ Status SlotMigrator::sendSnapshotByCmd() {
}
// Get user key
- auto [_, user_key] = ExtractNamespaceKey(iter->key(), true);
+ auto [_, user_key] = ExtractNamespaceKey(iter->key(),
/*slot_id_encoded=*/true);
// Add key's constructed commands to restore_cmds, send pipeline or not
according to task's max_pipeline_size
auto result = migrateOneKey(user_key, iter->value(), &restore_cmds);
@@ -429,7 +429,7 @@ Status SlotMigrator::syncWALByCmd() {
Status SlotMigrator::finishSuccessfulMigration() {
if (stop_migration_) {
- return {Status::NotOK, errMigrationTaskCanceled};
+ return {Status::NotOK, std::string(errMigrationTaskCanceled)};
}
// Set import status on the destination node to SUCCESS
@@ -723,6 +723,12 @@ StatusOr<KeyMigrationResult>
SlotMigrator::migrateOneKey(const rocksdb::Slice &k
}
break;
}
+ case kRedisHyperLogLog: {
+ // HyperLogLog migration by cmd is not supported,
+ // since it's hard to restore the same key structure for HyperLogLog
+ // commands.
+ break;
+ }
default:
break;
}
@@ -752,7 +758,17 @@ Status SlotMigrator::migrateSimpleKey(const rocksdb::Slice
&key, const Metadata
Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const
Metadata &metadata, std::string *restore_cmds) {
std::string cmd;
- cmd = type_to_cmd[metadata.Type()];
+ {
+ auto iter = type_to_cmd.find(metadata.Type());
+ if (iter != type_to_cmd.end()) {
+ cmd = iter->second;
+ } else {
+ if (metadata.Type() > RedisTypeNames.size()) {
+ return {Status::NotOK, "unknown key type: " +
std::to_string(metadata.Type())};
+ }
+ return {Status::NotOK, "unsupported complex key type: " +
RedisTypeNames[metadata.Type()]};
+ }
+ }
std::vector<std::string> user_cmd = {cmd, key.ToString()};
// Construct key prefix to iterate values of the complex type user key
@@ -769,7 +785,7 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice
&key, const Metadata
for (iter->Seek(prefix_subkey); iter->Valid(); iter->Next()) {
if (stop_migration_) {
- return {Status::NotOK, errMigrationTaskCanceled};
+ return {Status::NotOK, std::string(errMigrationTaskCanceled)};
}
if (!iter->key().starts_with(prefix_subkey)) {
@@ -811,6 +827,9 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice
&key, const Metadata
user_cmd.emplace_back(iter->value().ToString());
break;
}
+ case kRedisHyperLogLog: {
+ break;
+ }
default:
break;
}
@@ -878,7 +897,7 @@ Status SlotMigrator::migrateStream(const Slice &key, const
StreamMetadata &metad
for (iter->Seek(prefix_key); iter->Valid(); iter->Next()) {
if (stop_migration_) {
- return {Status::NotOK, errMigrationTaskCanceled};
+ return {Status::NotOK, std::string(errMigrationTaskCanceled)};
}
if (!iter->key().starts_with(prefix_key)) {
@@ -964,7 +983,7 @@ Status SlotMigrator::migrateBitmapKey(const InternalKey
&inkey, std::unique_ptr<
Status SlotMigrator::sendCmdsPipelineIfNeed(std::string *commands, bool need) {
if (stop_migration_) {
- return {Status::NotOK, errMigrationTaskCanceled};
+ return {Status::NotOK, std::string(errMigrationTaskCanceled)};
}
// Check pipeline
diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h
index 179150b8..1114f2a1 100644
--- a/src/cluster/slot_migrate.h
+++ b/src/cluster/slot_migrate.h
@@ -35,18 +35,25 @@
#include <vector>
#include "batch_sender.h"
-#include "config.h"
#include "encoding.h"
#include "parse_util.h"
-#include "redis_slot.h"
#include "server/server.h"
#include "slot_import.h"
-#include "stats/stats.h"
#include "status.h"
#include "storage/redis_db.h"
#include "unique_fd.h"
-enum class MigrationType { kRedisCommand = 0, kRawKeyValue };
+enum class MigrationType {
+ /// Use Redis commands to migrate data.
+ /// It will trying to extract commands from existing data and log, then
replay
+ /// them on the destination node.
+ kRedisCommand = 0,
+ /// Using raw key-value and "APPLYBATCH" command in kvrocks to migrate data.
+ ///
+ /// If downstream is not compatible with raw key-value, this migration type
will
+ /// auto switch to kRedisCommand.
+ kRawKeyValue
+};
enum class MigrationState { kNone = 0, kStarted, kSuccess, kFailed };
@@ -111,7 +118,7 @@ class SlotMigrator : public redis::Database {
private:
void loop();
void runMigrationProcess();
- bool isTerminated() { return thread_state_ == ThreadState::Terminated; }
+ bool isTerminated() const { return thread_state_ == ThreadState::Terminated;
}
Status startMigration();
Status sendSnapshot();
Status syncWAL();
@@ -158,11 +165,11 @@ class SlotMigrator : public redis::Database {
enum class ParserState { ArrayLen, BulkLen, BulkData, ArrayData, OneRspEnd };
enum class ThreadState { Uninitialized, Running, Terminated };
- static const int kDefaultMaxPipelineSize = 16;
- static const int kDefaultMaxMigrationSpeed = 4096;
- static const int kDefaultSequenceGapLimit = 10000;
- static const int kMaxItemsInCommand = 16; // number of items in every write
command of complex keys
- static const int kMaxLoopTimes = 10;
+ static constexpr int kDefaultMaxPipelineSize = 16;
+ static constexpr int kDefaultMaxMigrationSpeed = 4096;
+ static constexpr int kDefaultSequenceGapLimit = 10000;
+ static constexpr int kMaxItemsInCommand = 16; // number of items in every
write command of complex keys
+ static constexpr int kMaxLoopTimes = 10;
Server *srv_;
@@ -183,7 +190,7 @@ class SlotMigrator : public redis::Database {
std::thread t_;
std::mutex job_mutex_;
std::condition_variable job_cv_;
- std::unique_ptr<SlotMigrationJob> migration_job_;
+ std::unique_ptr<SlotMigrationJob> migration_job_; // GUARDED_BY(job_mutex_)
std::string dst_node_;
std::string dst_ip_;
diff --git a/src/stats/stats.h b/src/stats/stats.h
index 6fdba09a..e00506a9 100644
--- a/src/stats/stats.h
+++ b/src/stats/stats.h
@@ -41,7 +41,7 @@ enum StatsMetricFlags {
STATS_METRIC_COUNT
};
-const int STATS_METRIC_SAMPLES = 16; // Number of samples per metric
+constexpr int STATS_METRIC_SAMPLES = 16; // Number of samples per metric
struct CommandStat {
std::atomic<uint64_t> calls;