This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new bf56a05f chore: improve the style and document for SlotMigration 
(#2465)
bf56a05f is described below

commit bf56a05f93f6406f67db3bed54114a3e64ffe351
Author: mwish <[email protected]>
AuthorDate: Mon Aug 5 16:34:31 2024 +0800

    chore: improve the style and document for SlotMigration (#2465)
---
 src/cluster/slot_migrate.cc | 49 +++++++++++++++++++++++++++++++--------------
 src/cluster/slot_migrate.h  | 29 +++++++++++++++++----------
 src/stats/stats.h           |  2 +-
 3 files changed, 53 insertions(+), 27 deletions(-)

diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc
index f91c9a1a..cd7e9e42 100644
--- a/src/cluster/slot_migrate.cc
+++ b/src/cluster/slot_migrate.cc
@@ -34,10 +34,10 @@
 #include "time_util.h"
 #include "types/redis_stream_base.h"
 
-const char *errFailedToSendCommands = "failed to send commands to restore a 
key";
-const char *errMigrationTaskCanceled = "key migration stopped due to a task 
cancellation";
-const char *errFailedToSetImportStatus = "failed to set import status on 
destination node";
-const char *errUnsupportedMigrationType = "unsupported migration type";
+constexpr std::string_view errFailedToSendCommands = "failed to send commands 
to restore a key";
+constexpr std::string_view errMigrationTaskCanceled = "key migration stopped 
due to a task cancellation";
+constexpr std::string_view errFailedToSetImportStatus = "failed to set import 
status on destination node";
+constexpr std::string_view errUnsupportedMigrationType = "unsupported 
migration type";
 
 static std::map<RedisType, std::string> type_to_cmd = {
     {kRedisString, "set"}, {kRedisList, "rpush"},    {kRedisHash, "hmset"},    
  {kRedisSet, "sadd"},
@@ -318,7 +318,7 @@ Status SlotMigrator::sendSnapshot() {
   } else if (migration_type_ == MigrationType::kRawKeyValue) {
     return sendSnapshotByRawKV();
   }
-  return {Status::NotOK, errUnsupportedMigrationType};
+  return {Status::NotOK, std::string(errUnsupportedMigrationType)};
 }
 
 Status SlotMigrator::syncWAL() {
@@ -327,7 +327,7 @@ Status SlotMigrator::syncWAL() {
   } else if (migration_type_ == MigrationType::kRawKeyValue) {
     return syncWALByRawKV();
   }
-  return {Status::NotOK, errUnsupportedMigrationType};
+  return {Status::NotOK, std::string(errUnsupportedMigrationType)};
 }
 
 Status SlotMigrator::sendSnapshotByCmd() {
@@ -337,7 +337,7 @@ Status SlotMigrator::sendSnapshotByCmd() {
   std::string restore_cmds;
   SlotRange slot_range = slot_range_;
 
-  LOG(INFO) << "[migrate] Start migrating snapshot of slot(s)" << 
slot_range.String();
+  LOG(INFO) << "[migrate] Start migrating snapshot of slot(s): " << 
slot_range.String();
 
   // Construct key prefix to iterate the keys belong to the target slot
   std::string prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
@@ -351,12 +351,12 @@ Status SlotMigrator::sendSnapshotByCmd() {
   auto iter = 
util::UniqueIterator(storage_->GetDB()->NewIterator(read_options, cf_handle));
 
   // Seek to the beginning of keys start with 'prefix' and iterate all these 
keys
-  auto current_slot = slot_range.start;
+  int current_slot = slot_range.start;
   for (iter->Seek(prefix); iter->Valid(); iter->Next()) {
     // The migrating task has to be stopped, if server role is changed from 
master to slave
     // or flush command (flushdb or flushall) is executed
     if (stop_migration_) {
-      return {Status::NotOK, errMigrationTaskCanceled};
+      return {Status::NotOK, std::string(errMigrationTaskCanceled)};
     }
 
     // Iteration is out of range
@@ -366,7 +366,7 @@ Status SlotMigrator::sendSnapshotByCmd() {
     }
 
     // Get user key
-    auto [_, user_key] = ExtractNamespaceKey(iter->key(), true);
+    auto [_, user_key] = ExtractNamespaceKey(iter->key(), 
/*slot_id_encoded=*/true);
 
     // Add key's constructed commands to restore_cmds, send pipeline or not 
according to task's max_pipeline_size
     auto result = migrateOneKey(user_key, iter->value(), &restore_cmds);
@@ -429,7 +429,7 @@ Status SlotMigrator::syncWALByCmd() {
 
 Status SlotMigrator::finishSuccessfulMigration() {
   if (stop_migration_) {
-    return {Status::NotOK, errMigrationTaskCanceled};
+    return {Status::NotOK, std::string(errMigrationTaskCanceled)};
   }
 
   // Set import status on the destination node to SUCCESS
@@ -723,6 +723,12 @@ StatusOr<KeyMigrationResult> 
SlotMigrator::migrateOneKey(const rocksdb::Slice &k
       }
       break;
     }
+    case kRedisHyperLogLog: {
+      // HyperLogLog migration by cmd is not supported,
+      // since it's hard to restore the same key structure for HyperLogLog
+      // commands.
+      break;
+    }
     default:
       break;
   }
@@ -752,7 +758,17 @@ Status SlotMigrator::migrateSimpleKey(const rocksdb::Slice 
&key, const Metadata
 
 Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const 
Metadata &metadata, std::string *restore_cmds) {
   std::string cmd;
-  cmd = type_to_cmd[metadata.Type()];
+  {
+    auto iter = type_to_cmd.find(metadata.Type());
+    if (iter != type_to_cmd.end()) {
+      cmd = iter->second;
+    } else {
+      if (metadata.Type() > RedisTypeNames.size()) {
+        return {Status::NotOK, "unknown key type: " + 
std::to_string(metadata.Type())};
+      }
+      return {Status::NotOK, "unsupported complex key type: " + 
RedisTypeNames[metadata.Type()]};
+    }
+  }
 
   std::vector<std::string> user_cmd = {cmd, key.ToString()};
   // Construct key prefix to iterate values of the complex type user key
@@ -769,7 +785,7 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice 
&key, const Metadata
 
   for (iter->Seek(prefix_subkey); iter->Valid(); iter->Next()) {
     if (stop_migration_) {
-      return {Status::NotOK, errMigrationTaskCanceled};
+      return {Status::NotOK, std::string(errMigrationTaskCanceled)};
     }
 
     if (!iter->key().starts_with(prefix_subkey)) {
@@ -811,6 +827,9 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice 
&key, const Metadata
         user_cmd.emplace_back(iter->value().ToString());
         break;
       }
+      case kRedisHyperLogLog: {
+        break;
+      }
       default:
         break;
     }
@@ -878,7 +897,7 @@ Status SlotMigrator::migrateStream(const Slice &key, const 
StreamMetadata &metad
 
   for (iter->Seek(prefix_key); iter->Valid(); iter->Next()) {
     if (stop_migration_) {
-      return {Status::NotOK, errMigrationTaskCanceled};
+      return {Status::NotOK, std::string(errMigrationTaskCanceled)};
     }
 
     if (!iter->key().starts_with(prefix_key)) {
@@ -964,7 +983,7 @@ Status SlotMigrator::migrateBitmapKey(const InternalKey 
&inkey, std::unique_ptr<
 
 Status SlotMigrator::sendCmdsPipelineIfNeed(std::string *commands, bool need) {
   if (stop_migration_) {
-    return {Status::NotOK, errMigrationTaskCanceled};
+    return {Status::NotOK, std::string(errMigrationTaskCanceled)};
   }
 
   // Check pipeline
diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h
index 179150b8..1114f2a1 100644
--- a/src/cluster/slot_migrate.h
+++ b/src/cluster/slot_migrate.h
@@ -35,18 +35,25 @@
 #include <vector>
 
 #include "batch_sender.h"
-#include "config.h"
 #include "encoding.h"
 #include "parse_util.h"
-#include "redis_slot.h"
 #include "server/server.h"
 #include "slot_import.h"
-#include "stats/stats.h"
 #include "status.h"
 #include "storage/redis_db.h"
 #include "unique_fd.h"
 
-enum class MigrationType { kRedisCommand = 0, kRawKeyValue };
+enum class MigrationType {
+  /// Use Redis commands to migrate data.
+  /// It will trying to extract commands from existing data and log, then 
replay
+  /// them on the destination node.
+  kRedisCommand = 0,
+  /// Using raw key-value and "APPLYBATCH" command in kvrocks to migrate data.
+  ///
+  /// If downstream is not compatible with raw key-value, this migration type 
will
+  /// auto switch to kRedisCommand.
+  kRawKeyValue
+};
 
 enum class MigrationState { kNone = 0, kStarted, kSuccess, kFailed };
 
@@ -111,7 +118,7 @@ class SlotMigrator : public redis::Database {
  private:
   void loop();
   void runMigrationProcess();
-  bool isTerminated() { return thread_state_ == ThreadState::Terminated; }
+  bool isTerminated() const { return thread_state_ == ThreadState::Terminated; 
}
   Status startMigration();
   Status sendSnapshot();
   Status syncWAL();
@@ -158,11 +165,11 @@ class SlotMigrator : public redis::Database {
   enum class ParserState { ArrayLen, BulkLen, BulkData, ArrayData, OneRspEnd };
   enum class ThreadState { Uninitialized, Running, Terminated };
 
-  static const int kDefaultMaxPipelineSize = 16;
-  static const int kDefaultMaxMigrationSpeed = 4096;
-  static const int kDefaultSequenceGapLimit = 10000;
-  static const int kMaxItemsInCommand = 16;  // number of items in every write 
command of complex keys
-  static const int kMaxLoopTimes = 10;
+  static constexpr int kDefaultMaxPipelineSize = 16;
+  static constexpr int kDefaultMaxMigrationSpeed = 4096;
+  static constexpr int kDefaultSequenceGapLimit = 10000;
+  static constexpr int kMaxItemsInCommand = 16;  // number of items in every 
write command of complex keys
+  static constexpr int kMaxLoopTimes = 10;
 
   Server *srv_;
 
@@ -183,7 +190,7 @@ class SlotMigrator : public redis::Database {
   std::thread t_;
   std::mutex job_mutex_;
   std::condition_variable job_cv_;
-  std::unique_ptr<SlotMigrationJob> migration_job_;
+  std::unique_ptr<SlotMigrationJob> migration_job_;  // GUARDED_BY(job_mutex_)
 
   std::string dst_node_;
   std::string dst_ip_;
diff --git a/src/stats/stats.h b/src/stats/stats.h
index 6fdba09a..e00506a9 100644
--- a/src/stats/stats.h
+++ b/src/stats/stats.h
@@ -41,7 +41,7 @@ enum StatsMetricFlags {
   STATS_METRIC_COUNT
 };
 
-const int STATS_METRIC_SAMPLES = 16;  // Number of samples per metric
+constexpr int STATS_METRIC_SAMPLES = 16;  // Number of samples per metric
 
 struct CommandStat {
   std::atomic<uint64_t> calls;

Reply via email to