This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 574d438b Refactor kvrocks2redis via rocksdb secondary instance (#1963)
574d438b is described below

commit 574d438b9acde7b45b0666564af4450c057aceea
Author: HaveAnOrangeCat <[email protected]>
AuthorDate: Wed Jan 3 09:55:23 2024 +0800

    Refactor kvrocks2redis via rocksdb secondary instance (#1963)
---
 src/common/db_util.h                               |  20 +++
 src/storage/storage.cc                             |  31 +++-
 src/storage/storage.h                              |   9 +-
 utils/kvrocks2redis/config.cc                      |   8 -
 utils/kvrocks2redis/config.h                       |   3 -
 utils/kvrocks2redis/kvrocks2redis.conf             |   4 -
 utils/kvrocks2redis/main.cc                        |   2 +-
 utils/kvrocks2redis/parser.cc                      |   8 +-
 utils/kvrocks2redis/parser.h                       |  20 +--
 utils/kvrocks2redis/sync.cc                        | 172 ++++++------------
 utils/kvrocks2redis/sync.h                         |   8 +-
 utils/kvrocks2redis/tests/README.md                |  12 +-
 .../kvrocks2redis/tests/append-data-to-kvrocks.py  |  84 ---------
 utils/kvrocks2redis/tests/check_consistency.py     | 132 ++++++++++++++
 utils/kvrocks2redis/tests/populate-kvrocks.py      | 194 +++++++++++++++------
 15 files changed, 397 insertions(+), 310 deletions(-)

diff --git a/src/common/db_util.h b/src/common/db_util.h
index a7379c61..8df34daa 100644
--- a/src/common/db_util.h
+++ b/src/common/db_util.h
@@ -62,6 +62,13 @@ StatusOr<std::unique_ptr<T>> WrapOutPtrToUnique(Args&&... 
args) {
   return rocksdb::DB::OpenForReadOnly(db_options, dbname, column_families, 
handles, dbptr);
 }
 
+[[nodiscard]] inline rocksdb::Status DBOpenForSecondaryInstance(
+    const rocksdb::DBOptions& db_options, const std::string& dbname, const 
std::string& secondary_path,
+    const std::vector<rocksdb::ColumnFamilyDescriptor>& column_families,
+    std::vector<rocksdb::ColumnFamilyHandle*>* handles, rocksdb::DB** dbptr) {
+  return rocksdb::DB::OpenAsSecondary(db_options, dbname, secondary_path, 
column_families, handles, dbptr);
+}
+
 }  // namespace details
 
 inline StatusOr<std::unique_ptr<rocksdb::DB>> DBOpen(const rocksdb::Options& 
options, const std::string& dbname) {
@@ -95,6 +102,19 @@ inline StatusOr<std::unique_ptr<rocksdb::DB>> 
DBOpenForReadOnly(
       Status::DBOpenErr>(db_options, dbname, column_families, handles);
 }
 
+inline StatusOr<std::unique_ptr<rocksdb::DB>> DBOpenAsSecondaryInstance(
+    const rocksdb::DBOptions& db_options, const std::string& dbname, const 
std::string& secondary_path,
+    const std::vector<rocksdb::ColumnFamilyDescriptor>& column_families,
+    std::vector<rocksdb::ColumnFamilyHandle*>* handles) {
+  return details::WrapOutPtrToUnique<
+      rocksdb::DB,
+      static_cast<rocksdb::Status (*)(const rocksdb::DBOptions&, const 
std::string&, const std::string&,
+                                      const 
std::vector<rocksdb::ColumnFamilyDescriptor>&,
+                                      
std::vector<rocksdb::ColumnFamilyHandle*>*, rocksdb::DB**)>(
+          details::DBOpenForSecondaryInstance),
+      Status::DBOpenErr>(db_options, dbname, secondary_path, column_families, 
handles);
+}
+
 inline StatusOr<std::unique_ptr<rocksdb::BackupEngine>> 
BackupEngineOpen(rocksdb::Env* db_env,
                                                                          const 
rocksdb::BackupEngineOptions& options) {
   return details::WrapOutPtrToUnique<
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index c176db1b..c59be707 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -237,7 +237,7 @@ Status Storage::CreateColumnFamilies(const rocksdb::Options 
&options) {
   return Status::OK();
 }
 
-Status Storage::Open(bool read_only) {
+Status Storage::Open(DBOpenMode mode) {
   auto guard = WriteLockGuard();
   db_closing_ = false;
 
@@ -250,7 +250,7 @@ Status Storage::Open(bool read_only) {
   }
 
   rocksdb::Options options = InitRocksDBOptions();
-  if (!read_only) {
+  if (mode == kDBOpenModeDefault) {
     if (auto s = CreateColumnFamilies(options); !s.IsOK()) {
       return s.Prefixed("failed to create column families");
     }
@@ -316,17 +316,32 @@ Status Storage::Open(bool read_only) {
   if (!s.ok()) return {Status::NotOK, s.ToString()};
 
   auto start = std::chrono::high_resolution_clock::now();
-  auto dbs = read_only ? util::DBOpenForReadOnly(options, config_->db_dir, 
column_families, &cf_handles_)
-                       : util::DBOpen(options, config_->db_dir, 
column_families, &cf_handles_);
+  switch (mode) {
+    case DBOpenMode::kDBOpenModeDefault: {
+      db_ = GET_OR_RET(util::DBOpen(options, config_->db_dir, column_families, 
&cf_handles_));
+      break;
+    }
+    case DBOpenMode::kDBOpenModeForReadOnly: {
+      db_ = GET_OR_RET(util::DBOpenForReadOnly(options, config_->db_dir, 
column_families, &cf_handles_));
+      break;
+    }
+    case DBOpenMode::kDBOpenModeAsSecondaryInstance: {
+      db_ = GET_OR_RET(
+          util::DBOpenAsSecondaryInstance(options, config_->db_dir, 
config_->dir, column_families, &cf_handles_));
+      break;
+    }
+    default:
+      __builtin_unreachable();
+  }
   auto end = std::chrono::high_resolution_clock::now();
   int64_t duration = std::chrono::duration_cast<std::chrono::milliseconds>(end 
- start).count();
-  if (!s.ok()) {
+
+  if (!db_) {
     LOG(INFO) << "[storage] Failed to load the data from disk: " << duration 
<< " ms";
-    return {Status::DBOpenErr, s.ToString()};
+    return {Status::DBOpenErr};
   }
-
-  db_ = std::move(*dbs);
   LOG(INFO) << "[storage] Success to load the data from disk: " << duration << 
" ms";
+
   return Status::OK();
 }
 
diff --git a/src/storage/storage.h b/src/storage/storage.h
index b0de2dd0..570ecf6c 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -29,6 +29,7 @@
 
 #include <atomic>
 #include <cinttypes>
+#include <cstddef>
 #include <memory>
 #include <shared_mutex>
 #include <string>
@@ -51,6 +52,12 @@ enum ColumnFamilyID {
   kColumnFamilyIDStream,
 };
 
+enum DBOpenMode {
+  kDBOpenModeDefault,
+  kDBOpenModeForReadOnly,
+  kDBOpenModeAsSecondaryInstance,
+};
+
 namespace engine {
 
 constexpr const char *kPubSubColumnFamilyName = "pubsub";
@@ -100,7 +107,7 @@ class Storage {
   ~Storage();
 
   void SetWriteOptions(const Config::RocksDB::WriteOptions &config);
-  Status Open(bool read_only = false);
+  Status Open(DBOpenMode mode = kDBOpenModeDefault);
   void CloseDB();
   void EmptyDB();
   rocksdb::BlockBasedTableOptions InitTableOptions();
diff --git a/utils/kvrocks2redis/config.cc b/utils/kvrocks2redis/config.cc
index 8e48f8ff..ac6f24ed 100644
--- a/utils/kvrocks2redis/config.cc
+++ b/utils/kvrocks2redis/config.cc
@@ -87,14 +87,6 @@ Status Config::parseConfigFromString(const std::string 
&input) {
     }
   } else if (size == 1 && key == "pidfile") {
     pidfile = args[0];
-  } else if (size >= 2 && key == "kvrocks") {
-    kvrocks_host = args[0];
-    // In new versions, we don't use extra port to implement replication
-    kvrocks_port = 
GET_OR_RET(ParseInt<std::uint16_t>(args[1]).Prefixed("kvrocks port number"));
-
-    if (size == 3) {
-      kvrocks_auth = args[2];
-    }
   } else if (size == 1 && key == "cluster-enable") {
     // Renamed to cluster-enabled, keeping the old one for compatibility.
     cluster_enabled = GET_OR_RET(yesnotoi(args[0]).Prefixed("key 
'cluster-enable'"));
diff --git a/utils/kvrocks2redis/config.h b/utils/kvrocks2redis/config.h
index e67cae19..a9a4bf3b 100644
--- a/utils/kvrocks2redis/config.h
+++ b/utils/kvrocks2redis/config.h
@@ -48,9 +48,6 @@ struct Config {
   std::string next_offset_file_name = "last_next_offset.txt";
   std::string next_seq_file_path = output_dir + "/last_next_seq.txt";
 
-  std::string kvrocks_auth;
-  std::string kvrocks_host;
-  int kvrocks_port = 0;
   std::map<std::string, RedisServer> tokens;
   bool cluster_enabled = false;
 
diff --git a/utils/kvrocks2redis/kvrocks2redis.conf 
b/utils/kvrocks2redis/kvrocks2redis.conf
index 33a19555..c3d1cbca 100644
--- a/utils/kvrocks2redis/kvrocks2redis.conf
+++ b/utils/kvrocks2redis/kvrocks2redis.conf
@@ -22,10 +22,6 @@ data-dir ./data
 # Default: ./
 output-dir ./
 
-# Sync kvrocks node. Use the node's Psync command to get the newest wal raw 
write_batch.
-#
-# kvrocks <kvrocks_ip> <kvrocks_port> [<kvrocks_auth>]
-kvrocks 127.0.0.1 6666
 
 # Enable cluster mode.
 #
diff --git a/utils/kvrocks2redis/main.cc b/utils/kvrocks2redis/main.cc
index 767c2f19..e6ca93bf 100644
--- a/utils/kvrocks2redis/main.cc
+++ b/utils/kvrocks2redis/main.cc
@@ -122,7 +122,7 @@ int main(int argc, char *argv[]) {
   kvrocks_config.slot_id_encoded = config.cluster_enabled;
 
   engine::Storage storage(&kvrocks_config);
-  s = storage.Open(true);
+  s = storage.Open(kDBOpenModeAsSecondaryInstance);
   if (!s.IsOK()) {
     LOG(ERROR) << "Failed to open Kvrocks storage: " << s.Msg();
     exit(1);
diff --git a/utils/kvrocks2redis/parser.cc b/utils/kvrocks2redis/parser.cc
index 0c463d9a..86b3e1a5 100644
--- a/utils/kvrocks2redis/parser.cc
+++ b/utils/kvrocks2redis/parser.cc
@@ -33,14 +33,13 @@
 
 Status Parser::ParseFullDB() {
   rocksdb::DB *db = storage_->GetDB();
-  if (!latest_snapshot_) latest_snapshot_ = 
std::make_unique<LatestSnapShot>(db);
   rocksdb::ColumnFamilyHandle *metadata_cf_handle = 
storage_->GetCFHandle(engine::kMetadataColumnFamilyName);
-
+  // Due to RSI(Rocksdb Secondary Instance) not supporting "Snapshots based 
read", we don't need to set the snapshot
+  // parameter. However, until we proactively invoke TryCatchUpWithPrimary, 
this replica is read-only, which can be
+  // considered as a snapshot.
   rocksdb::ReadOptions read_options;
-  read_options.snapshot = latest_snapshot_->GetSnapShot();
   read_options.fill_cache = false;
   std::unique_ptr<rocksdb::Iterator> iter(db->NewIterator(read_options, 
metadata_cf_handle));
-
   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
     Metadata metadata(kRedisNone);
     auto ds = metadata.Decode(iter->value());
@@ -91,7 +90,6 @@ Status Parser::parseComplexKV(const Slice &ns_key, const 
Metadata &metadata) {
   std::string next_version_prefix_key = InternalKey(ns_key, "", 
metadata.version + 1, slot_id_encoded_).Encode();
 
   rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
-  read_options.snapshot = latest_snapshot_->GetSnapShot();
   rocksdb::Slice upper_bound(next_version_prefix_key);
   read_options.iterate_upper_bound = &upper_bound;
 
diff --git a/utils/kvrocks2redis/parser.h b/utils/kvrocks2redis/parser.h
index 69fe0f8d..09ba2f17 100644
--- a/utils/kvrocks2redis/parser.h
+++ b/utils/kvrocks2redis/parser.h
@@ -33,27 +33,10 @@
 #include "storage/storage.h"
 #include "writer.h"
 
-class LatestSnapShot {
- public:
-  explicit LatestSnapShot(rocksdb::DB *db) : db_(db), 
snapshot_(db_->GetSnapshot()) {}
-  ~LatestSnapShot() { db_->ReleaseSnapshot(snapshot_); }
-
-  LatestSnapShot(const LatestSnapShot &) = delete;
-  LatestSnapShot &operator=(const LatestSnapShot &) = delete;
-
-  const rocksdb::Snapshot *GetSnapShot() { return snapshot_; }
-
- private:
-  rocksdb::DB *db_ = nullptr;
-  const rocksdb::Snapshot *snapshot_ = nullptr;
-};
-
 class Parser {
  public:
   explicit Parser(engine::Storage *storage, Writer *writer)
-      : storage_(storage), writer_(writer), 
slot_id_encoded_(storage_->IsSlotIdEncoded()) {
-    latest_snapshot_ = std::make_unique<LatestSnapShot>(storage->GetDB());
-  }
+      : storage_(storage), writer_(writer), 
slot_id_encoded_(storage_->IsSlotIdEncoded()) {}
   ~Parser() = default;
 
   Status ParseFullDB();
@@ -62,7 +45,6 @@ class Parser {
  protected:
   engine::Storage *storage_ = nullptr;
   Writer *writer_ = nullptr;
-  std::unique_ptr<LatestSnapShot> latest_snapshot_;
   bool slot_id_encoded_ = false;
 
   Status parseSimpleKV(const Slice &ns_key, const Slice &value, uint64_t 
expire);
diff --git a/utils/kvrocks2redis/sync.cc b/utils/kvrocks2redis/sync.cc
index c142b942..61303a0a 100644
--- a/utils/kvrocks2redis/sync.cc
+++ b/utils/kvrocks2redis/sync.cc
@@ -43,17 +43,14 @@ Sync::Sync(engine::Storage *storage, Writer *writer, Parser 
*parser, kvrocks2red
     : storage_(storage), writer_(writer), parser_(parser), config_(config) {}
 
 Sync::~Sync() {
-  if (sock_fd_) close(sock_fd_);
   if (next_seq_fd_) close(next_seq_fd_);
   writer_->Stop();
 }
 
 /*
- * Run connect to kvrocks, and start the following steps
- * asynchronously
- *  - TryPsync
- *  - - if ok, IncrementBatchLoop
- *  - - not, parseAllLocalStorage and restart TryPsync when done
+ * 1. Attempt to directly parse the wal.
+ * 2. If the attempt fails, then it is necessary to parse the current snapshot,
+ *    After completion, repeat the steps of the first phase.
  */
 void Sync::Start() {
   auto s = readNextSeqFromFile(&next_seq_);
@@ -61,39 +58,16 @@ void Sync::Start() {
     LOG(ERROR) << s.Msg();
     return;
   }
-
   LOG(INFO) << "[kvrocks2redis] Start sync the data from kvrocks to redis";
   while (!IsStopped()) {
-    auto sock_fd = util::SockConnect(config_->kvrocks_host, 
config_->kvrocks_port);
-    if (!sock_fd) {
-      LOG(ERROR) << fmt::format("Failed to connect to Kvrocks on {}:{}. Error: 
{}", config_->kvrocks_host,
-                                config_->kvrocks_port, sock_fd.Msg());
-      usleep(10000);
-      continue;
+    s = checkWalBoundary();
+    if (!s.IsOK()) {
+      parseKVFromLocalStorage();
     }
-
-    sock_fd_ = *sock_fd;
-    s = auth();
+    s = incrementBatchLoop();
     if (!s.IsOK()) {
       LOG(ERROR) << s.Msg();
-      usleep(10000);
-      continue;
-    }
-
-    while (!IsStopped()) {
-      s = tryPSync();
-      if (!s.IsOK()) {
-        LOG(ERROR) << s.Msg();
-        break;
-      }
-      LOG(INFO) << "[kvrocks2redis] PSync is ok, start increment batch loop";
-      s = incrementBatchLoop();
-      if (!s.IsOK()) {
-        LOG(ERROR) << s.Msg();
-        continue;
-      }
     }
-    close(sock_fd_);
   }
 }
 
@@ -104,98 +78,68 @@ void Sync::Stop() {
   LOG(INFO) << "[kvrocks2redis] Stopped";
 }
 
-Status Sync::auth() {
-  // Send auth when needed
-  if (!config_->kvrocks_auth.empty()) {
-    const auto auth_command = redis::MultiBulkString({"AUTH", 
config_->kvrocks_auth});
-    auto s = util::SockSend(sock_fd_, auth_command);
-    if (!s) return s.Prefixed("send auth command err");
-    std::string line = GET_OR_RET(util::SockReadLine(sock_fd_).Prefixed("read 
auth response err"));
-    if (line.compare(0, 3, "+OK") != 0) {
-      return {Status::NotOK, "auth got invalid response"};
-    }
-  }
-  LOG(INFO) << "[kvrocks2redis] Auth succ, continue...";
-  return Status::OK();
+Status Sync::tryCatchUpWithPrimary() {
+  auto s = storage_->GetDB()->TryCatchUpWithPrimary();
+  return s.ok() ? Status() : Status::NotOK;
 }
 
-Status Sync::tryPSync() {
-  const auto seq_str = std::to_string(next_seq_);
-  const auto seq_len_str = std::to_string(seq_str.length());
-  const auto cmd_str = "*2" CRLF "$5" CRLF "PSYNC" CRLF "$" + seq_len_str + 
CRLF + seq_str + CRLF;
-  auto s = util::SockSend(sock_fd_, cmd_str);
-  LOG(INFO) << "[kvrocks2redis] Try to use psync, next seq: " << next_seq_;
-  if (!s) return s.Prefixed("send psync command err");
-  std::string line = GET_OR_RET(util::SockReadLine(sock_fd_).Prefixed("read 
psync response err"));
+Status Sync::checkWalBoundary() {
+  if (next_seq_ == storage_->LatestSeqNumber() + 1) {
+    return Status::OK();
+  }
 
-  if (line.compare(0, 3, "+OK") != 0) {
-    if (next_seq_ > 0) {
-      // Ooops, Failed to psync , sync process has been terminated, 
administrator should be notified
-      // when full sync is needed, please remove last_next_seq config file, 
and restart kvrocks2redis
-      auto error_msg =
-          "[kvrocks2redis] CRITICAL - Failed to psync , please remove"
-          " last_next_seq config file, and restart kvrocks2redis, redis reply: 
" +
-          std::string(line);
-      stop_flag_ = true;
-      return {Status::NotOK, error_msg};
+  // Upper bound
+  if (next_seq_ > storage_->LatestSeqNumber() + 1) {
+    return {Status::NotOK};
+  }
+
+  // Lower bound
+  std::unique_ptr<rocksdb::TransactionLogIterator> iter;
+  auto s = storage_->GetWALIter(next_seq_, &iter);
+  if (s.IsOK() && iter->Valid()) {
+    auto batch = iter->GetBatch();
+    if (next_seq_ != batch.sequence) {
+      if (next_seq_ > batch.sequence) {
+        LOG(ERROR) << "checkWALBoundary with sequence: " << next_seq_
+                   << ", but GetWALIter return older sequence: " << 
batch.sequence;
+      }
+      return {Status::NotOK};
     }
-    // PSYNC isn't OK, we should use parseAllLocalStorage
-    // Switch to parseAllLocalStorage
-    LOG(INFO) << "[kvrocks2redis] Failed to psync, redis reply: " << 
std::string(line);
-    parseKVFromLocalStorage();
-    // Restart tryPSync
-    return tryPSync();
+    return Status::OK();
   }
-  return Status::OK();
+  return {Status::NotOK};
 }
 
 Status Sync::incrementBatchLoop() {
-  std::cout << "Start parse increment batch ..." << std::endl;
-  evbuffer *evbuf = evbuffer_new();
+  LOG(INFO) << "[kvrocks2redis] Start parsing increment data";
+  std::unique_ptr<rocksdb::TransactionLogIterator> iter;
   while (!IsStopped()) {
-    if (evbuffer_read(evbuf, sock_fd_, -1) <= 0) {
-      evbuffer_free(evbuf);
-      return {Status::NotOK, std::string("[kvrocks2redis] read increment batch 
err: ") + strerror(errno)};
+    if (!tryCatchUpWithPrimary().IsOK()) {
+      return {Status::NotOK};
     }
-    if (incr_state_ == IncrementBatchLoopState::Incr_batch_size) {
-      // Read bulk length
-      UniqueEvbufReadln line(evbuf, EVBUFFER_EOL_CRLF_STRICT);
-      if (!line) {
-        usleep(10000);
-        continue;
-      }
-      incr_bulk_len_ = line.length > 0 ? std::strtoull(line.get() + 1, 
nullptr, 10) : 0;
-      if (incr_bulk_len_ == 0) {
-        return {Status::NotOK, "[kvrocks2redis] Invalid increment data size"};
-      }
-      incr_state_ = Incr_batch_data;
-    }
-
-    if (incr_state_ == IncrementBatchLoopState::Incr_batch_data) {
-      // Read bulk data (batch data)
-      if (incr_bulk_len_ + 2 <= evbuffer_get_length(evbuf)) {  // We got 
enough data
-        char *bulk_data = reinterpret_cast<char *>(evbuffer_pullup(evbuf, 
static_cast<ssize_t>(incr_bulk_len_) + 2));
-        std::string bulk_data_str = std::string(bulk_data, incr_bulk_len_);
-        // Skip the ping packet
-        if (bulk_data_str != "ping") {
-          auto bat = rocksdb::WriteBatch(bulk_data_str);
-          int count = static_cast<int>(bat.Count());
-          auto s = parser_->ParseWriteBatch(bulk_data_str);
-          if (!s.IsOK()) {
-            return s.Prefixed(fmt::format("failed to parse write batch '{}'", 
util::StringToHex(bulk_data_str)));
-          }
-
-          s = updateNextSeq(next_seq_ + count);
-          if (!s.IsOK()) {
-            return s.Prefixed("failed to update next sequence");
+    if (next_seq_ <= storage_->LatestSeqNumber()) {
+      storage_->GetDB()->GetUpdatesSince(next_seq_, &iter);
+      for (; iter->Valid(); iter->Next()) {
+        auto batch = iter->GetBatch();
+        if (batch.sequence != next_seq_) {
+          if (next_seq_ > batch.sequence) {
+            LOG(ERROR) << "checkWALBoundary with sequence: " << next_seq_
+                       << ", but GetWALIter return older sequence: " << 
batch.sequence;
           }
+          return {Status::NotOK};
+        }
+        auto s = parser_->ParseWriteBatch(batch.writeBatchPtr->Data());
+        if (!s.IsOK()) {
+          return s.Prefixed(
+              fmt::format("failed to parse write batch '{}'", 
util::StringToHex(batch.writeBatchPtr->Data())));
+        }
+        s = updateNextSeq(next_seq_ + batch.writeBatchPtr->Count());
+        if (!s.IsOK()) {
+          return s.Prefixed("failed to update next sequence");
         }
-        evbuffer_drain(evbuf, incr_bulk_len_ + 2);
-        incr_state_ = Incr_batch_size;
-      } else {
-        usleep(10000);
-        continue;
       }
+    } else {
+      usleep(10000);
     }
   }
   return Status::OK();
@@ -217,8 +161,8 @@ void Sync::parseKVFromLocalStorage() {
     LOG(ERROR) << "[kvrocks2redis] Failed to parse full db, encounter error: " 
<< s.Msg();
     return;
   }
-
-  s = updateNextSeq(storage_->LatestSeqNumber() + 1);
+  auto last_seq = storage_->GetDB()->GetLatestSequenceNumber();
+  s = updateNextSeq(last_seq + 1);
   if (!s.IsOK()) {
     LOG(ERROR) << "[kvrocks2redis] Failed to update next sequence: " << 
s.Msg();
   }
diff --git a/utils/kvrocks2redis/sync.h b/utils/kvrocks2redis/sync.h
index 66b9ffae..5c746b84 100644
--- a/utils/kvrocks2redis/sync.h
+++ b/utils/kvrocks2redis/sync.h
@@ -46,7 +46,6 @@ class Sync {
   bool IsStopped() const { return stop_flag_; }
 
  private:
-  int sock_fd_;
   bool stop_flag_ = false;
   engine::Storage *storage_ = nullptr;
   Writer *writer_ = nullptr;
@@ -61,12 +60,11 @@ class Sync {
     Incr_batch_data,
   } incr_state_ = Incr_batch_size;
 
-  size_t incr_bulk_len_ = 0;
-
-  Status auth();
-  Status tryPSync();
   Status incrementBatchLoop();
 
+  Status tryCatchUpWithPrimary();
+  Status checkWalBoundary();
+
   void parseKVFromLocalStorage();
 
   Status updateNextSeq(rocksdb::SequenceNumber seq);
diff --git a/utils/kvrocks2redis/tests/README.md 
b/utils/kvrocks2redis/tests/README.md
index 17c6d687..6bc6048d 100644
--- a/utils/kvrocks2redis/tests/README.md
+++ b/utils/kvrocks2redis/tests/README.md
@@ -12,9 +12,11 @@ For testing the `kvrocks2redis` utility, manually check 
generate AOF.
 
 ```bash
 # populate data
-python populate-kvrocks.py  
-# check generated aof file 
-# append new data 
-python append-data-to-kvrocks.py
-# check appended new aof data  
+python3 populate-kvrocks.py [--host HOST] [--port PORT] [--password PASSWORD] 
[--flushdb FLUSHDB]
+# check generated aof file & user_key.log file
+
+# check consistency
+python3 check_consistency.py [--src_host SRC_HOST] [--src_port SRC_PORT] 
[--src_password SRC_PASSWORD]
+                          [--dst_host DST_HOST] [--dst_port DST_PORT] 
[--dst_password DST_PASSWORD]
+                          [--key_file KEY_FILE]
 ```
diff --git a/utils/kvrocks2redis/tests/append-data-to-kvrocks.py 
b/utils/kvrocks2redis/tests/append-data-to-kvrocks.py
deleted file mode 100644
index 6cfb3211..00000000
--- a/utils/kvrocks2redis/tests/append-data-to-kvrocks.py
+++ /dev/null
@@ -1,84 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import redis
-
-range=100
-factor=32
-port=6666
-
-r = redis.StrictRedis(host='localhost', port=port, db=0, password='foobared')
-
-# string
-rst = r.set('foo', 2)  # update old
-assert rst
-rst = r.set('foo2', 2)  # add new
-assert rst
-
-rst = r.setex('foo_ex', 7200, 2)
-assert rst
-
-# zset
-rst = r.zadd('zfoo', 4, 'd')
-assert(rst == 1)
-rst = r.zrem('zfoo', 'd')
-assert(rst == 1)
-
-# list
-rst = r.lset('lfoo', 0, 'a')
-assert(rst == 1)
-rst = r.rpush('lfoo', 'a')
-assert(rst == 5)
-rst = r.lpush('lfoo', 'b')
-assert(rst == 6)
-rst = r.lpop('lfoo')
-assert(rst == 'b')
-rst = r.rpop('lfoo')
-assert(rst == 'a')
-rst = r.ltrim('lfoo', 0, 2)
-assert rst
-
-# set
-rst = r.sadd('sfoo', 'f')
-assert(rst == 1)
-rst = r.srem('sfoo', 'f')
-assert(rst == 1)
-
-# hash
-rst = r.hset('hfoo', 'b', 2)
-assert(rst == 1)
-rst = r.hdel('hfoo', 'b')
-assert(rst == 1)
-
-# bitmap
-rst = r.setbit('bfoo', 0, 0)  # update old
-assert(rst == 1)
-rst = r.setbit('bfoo', 900000, 1)  # add new
-assert(rst == 0)
-
-# expire cmd
-rst = r.expire('foo', 7200)
-assert rst
-rst = r.expire('zfoo', 7200)
-assert rst
-
-# del cmd
-rst = r.delete('foo')
-assert rst
-rst = r.delete('zfoo')
-assert rst
-
diff --git a/utils/kvrocks2redis/tests/check_consistency.py 
b/utils/kvrocks2redis/tests/check_consistency.py
new file mode 100644
index 00000000..3a176cc8
--- /dev/null
+++ b/utils/kvrocks2redis/tests/check_consistency.py
@@ -0,0 +1,132 @@
+#!/usr/bin/env python3
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import redis
+import codecs
+import argparse
+import time
+
+class RedisComparator:
+    def __init__(self, src_host, src_port, src_password, dst_host, dst_port, 
dst_password):
+        self.src_cli = self._get_redis_client(src_host, src_port, src_password)
+        self.dst_cli = self._get_redis_client(dst_host, dst_port, dst_password)
+
+    def _get_redis_client(self, host, port, password):
+        return redis.Redis(host=host, port=port, decode_responses=True, 
password=password)
+
+    def _compare_string_data(self, key):
+        src_data = self.src_cli.get(key)
+        dst_data = self.dst_cli.get(key)
+        return src_data, dst_data
+
+    def _compare_hash_data(self, key):
+        src_data = self.src_cli.hgetall(key)
+        dst_data = self.dst_cli.hgetall(key)
+        return src_data, dst_data
+
+    def _compare_list_data(self, key):
+        src_data = self.src_cli.lrange(key, 0, -1)
+        dst_data = self.dst_cli.lrange(key, 0, -1)
+        return src_data, dst_data
+
+    def _compare_set_data(self, key):
+        src_data = self.src_cli.smembers(key)
+        dst_data = self.dst_cli.smembers(key)
+        return src_data, dst_data
+
+    def _compare_zset_data(self, key):
+        src_data = self.src_cli.zrange(key, 0, -1, withscores=True)
+        dst_data = self.dst_cli.zrange(key, 0, -1, withscores=True)
+        return src_data, dst_data
+
+    def _compare_bitmap_data(self, key, pos):
+        src_data = self.src_cli.getbit(key, pos)
+        dst_data = self.dst_cli.getbit(key, pos)
+        return src_data, dst_data
+
+    def _compare_data(self, keys : list, data_type):
+        if data_type == "string":
+            return self._compare_string_data(keys[0])
+        elif data_type == "hash":
+            return self._compare_hash_data(keys[0])
+        elif data_type == "list":
+            return self._compare_list_data(keys[0])
+        elif data_type == "set":
+            return self._compare_set_data(keys[0])
+        elif data_type == "zset":
+            return self._compare_zset_data(keys[0])
+        elif data_type == 'bitmap':
+            return self._compare_bitmap_data(keys[0], keys[1])
+        elif data_type == 'none':
+            return self.src_cli.type(keys[0]), 'none'
+        else:
+            raise ValueError(f"Unsupported data type '{data_type}' for key 
'{keys[0]}'")
+
+    def compare_redis_data(self, key_file=''):
+        if key_file:
+            with open(key_file, 'rb') as f:
+                for line in f:
+                    keys = codecs.decode(line.strip()).split('-')
+                    data_type = self.src_cli.type(keys[0])
+                    src_data, dst_data = self._compare_data(keys, data_type)
+                    if src_data != dst_data:
+                        raise AssertionError(f"Data mismatch for key '{key}': 
source data: '{src_data}' destination data: '{dst_data}'")
+
+        self._import_and_compare(100)
+        print('All tests passed.')
+
+    def _import_and_compare(self, num):
+        for i in range(num):
+            key = f'key_{i}'
+            value = f'value_{i}'
+            self.src_cli.set(key, value)
+            incr_key = 'incr_key'
+            self.src_cli.incr(incr_key)
+            hash_key = f'hash_key_{i}'
+            hash_value = {'field1': f'field1_value_{i}', 'field2': 
f'field2_value_{i}'}
+            self.src_cli.hmset(hash_key, hash_value)
+            set_key = f'set_key_{i}'
+            set_value = [f'set_value_{i}_1', f'set_value_{i}_2', 
f'set_value_{i}_3']
+            self.src_cli.sadd(set_key, *set_value)
+            zset_key = f'zset_key_{i}'
+            zset_value = {f'member_{i}_1': i+1, f'member_{i}_2': i+2, 
f'member_{i}_3': i+3}
+            self.src_cli.zadd(zset_key, zset_value)
+            time.sleep(0.02)
+            keys = [key, incr_key, hash_key, set_key, zset_key]
+            for key in keys:
+                data_type = self.src_cli.type(key)
+                src_data, dst_data = self._compare_data([key], data_type)
+                if src_data != dst_data:
+                    raise AssertionError(f"Data mismatch for key '{key}': 
source data: '{src_data}' destination data: '{dst_data}'")
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser(description='Redis Comparator')
+    parser.add_argument('--src_host', type=str, default='127.0.0.1', 
help='Source Redis host')
+    parser.add_argument('--src_port', type=int, default=6666, help='Source 
Redis port')
+    parser.add_argument('--src_password', type=str, default='foobared', 
help='Source Redis password')
+    parser.add_argument('--dst_host', type=str, default='127.0.0.1', 
help='Destination Redis host')
+    parser.add_argument('--dst_port', type=int, default=6379, 
help='Destination Redis port')
+    parser.add_argument('--dst_password', type=str, default='', 
help='Destination Redis password')
+    parser.add_argument('--key_file', type=str, help='Path to the file 
containing keys to compare')
+
+    args = parser.parse_args()
+
+    redis_comparator = RedisComparator(args.src_host, args.src_port, 
args.src_password, args.dst_host, args.dst_port, args.dst_password)
+    redis_comparator.compare_redis_data(args.key_file)
\ No newline at end of file
diff --git a/utils/kvrocks2redis/tests/populate-kvrocks.py 
b/utils/kvrocks2redis/tests/populate-kvrocks.py
index cad3d2d4..8883a0df 100644
--- a/utils/kvrocks2redis/tests/populate-kvrocks.py
+++ b/utils/kvrocks2redis/tests/populate-kvrocks.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python3
+
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,57 +17,143 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import time
+import argparse
 import redis
-
-range=100
-factor=32
-port=6666
-
-r = redis.StrictRedis(host='localhost', port=port, db=0, password='foobared')
-
-# flushall ?
-# rst = r.flushall()
-# assert rst
-
-# string
-rst = r.set('foo', 1)
-assert rst
-
-rst = r.setex('foo_ex', 3600, 1)
-assert rst
-
-# zset
-rst = r.zadd('zfoo', 1, 'a', 2, 'b', 3, 'c')
-assert(rst == 3)
-
-# list
-rst = r.rpush('lfoo', 1, 2, 3, 4)
-assert(rst == 4)
-
-# set
-rst = r.sadd('sfoo', 'a', 'b', 'c', 'd')
-assert(rst == 4)
-
-# hash
-rst = r.hset('hfoo', 'a', 1)
-assert(rst == 1)
-
-# bitmap
-rst = r.setbit('bfoo', 0, 1)
-assert(rst == 0)
-rst = r.setbit('bfoo', 1, 1)
-assert(rst == 0)
-rst = r.setbit('bfoo', 800000, 1)
-assert(rst == 0)
-
-# expire cmd
-rst = r.expire('foo', 3600)
-assert rst
-rst = r.expire('zfoo', 3600)
-assert rst
-
-
-
-
-
-
+import sys
+
+
+filename = 'user_key.log'
+file = open(filename, 'w')
+
+PopulateCases = [
+    ('string', [
+        [('set', 'foo', 1), True],
+        [('setex', 'foo_ex', 3600, 1), True],
+    ]),
+    ('zset', [
+        [('zadd', 'zfoo', 1, 'a', 2, 'b', 3, 'c'), 3]
+    ]),
+    ('list', [
+        [('rpush', 'lfoo', 1, 2, 3, 4), 4]
+    ]),
+    ('set', [
+        [('sadd', 'sfoo', 'a', 'b', 'c', 'd'), 4]
+    ]),
+    ('hash', [
+        [('hset', 'hfoo', 'a', 1), 1]
+    ]),
+    ('bitmap', [
+        [('setbit', 'bfoo', 0, 1), 0],
+        [('setbit', 'bfoo', 1, 1), 0],
+        [('setbit', 'bfoo', 800000, 1), 0]
+    ]),
+    ('expire', [
+        [('expire', 'foo', 3600), True],
+        [('expire', 'zfoo', 3600), True]
+    ])
+]
+
+AppendCases = [
+    ('string', [
+        [('set', 'foo', 2), True],
+        [('set', 'foo2', 2), True],
+        [('setex', 'foo_ex', 7200, 2), True]
+    ]),
+    ('zset', [
+        [('zadd', 'zfoo', 4, 'd'), 1],
+        [('zrem', 'zfoo', 'd'), 1]
+    ]),
+    ('list', [
+        [('lset', 'lfoo', 0, 'a'), 1],
+        [('rpush', 'lfoo', 'a'), 5],
+        [('lpush', 'lfoo', 'b'), 6],
+        [('lpop', 'lfoo'), 'b'],
+        [('rpop', 'lfoo'), 'a'],
+        [('ltrim', 'lfoo', 0, 2), True]
+    ]),
+    ('set', [
+        [('sadd', 'sfoo', 'f'), 1],
+        [('srem', 'sfoo', 'f'), 1]
+    ]),
+    ('hash', [
+        [('hset', 'hfoo', 'b', 2), 1],
+        [('hdel', 'hfoo', 'b'), 1]
+    ]),
+    ('bitmap', [
+        [('setbit', 'bfoo', 0, 0), 1],
+        [('setbit', 'bfoo', 900000, 1), 0]
+    ]),
+    ('expire', [
+        [('expire', 'foo', 7200), True],
+        [('expire', 'zfoo', 7200), True]
+    ]),
+    ('delete', [
+        [('del', 'foo'), True],
+        [('del', 'zfoo'), True]
+    ])
+]
+
+def parse_args():
+    parser = argparse.ArgumentParser()
+    parser.add_argument('--host', default='127.0.0.1', type=str, help='host')
+    parser.add_argument('--port', default=6666, type=int, help='port')
+    parser.add_argument('--password', default='foobared')
+    parser.add_argument('--flushdb', default=False, type=str, help='need to 
flushdb')
+
+    return parser.parse_args()
+
+def check(cmd, r):
+    if len(cmd) == 1:
+        print('EXEC %s' % (str(cmd[0]),))
+        return True
+    if hasattr(cmd[1], '__call__'):
+        isPass = cmd[1](r)
+    else:
+        isPass = r == cmd[1]
+    if not isPass:
+        print('FAIL %s:%s != %s' % (str(cmd[0]), repr(r), repr(cmd[1])))
+        return False
+    return True
+
+def pipeline_execute(client, name, cmds):
+    succ = True
+    p = client.pipeline(transaction=False)
+    try:
+        for cmd in cmds:
+            if (name != 'bitmap'):
+                file.write(cmd[0][1] + '\n')
+            else:
+                file.write(f"{cmd[0][1]}-{cmd[0][2]}" + '\n')
+            p.execute_command(*cmd[0])
+        res = p.execute()
+        for i in range(0, len(cmds)):
+            if not check(cmds[i], res[i]):
+                succ = False
+    except Exception as excp:
+        succ = False
+        print('EXCP %s' % str(excp))
+    return succ
+
+
+
+def run_test(client, cases : list):
+    fails = []
+    for case in cases:
+        if not pipeline_execute(client, case[0], case[1]):
+            fails.append(case[0])
+    if len(fails) > 0:
+        print('******* Some case test fail *******')
+        for cmd in fails:
+            print(cmd)
+    else:
+        print('All case passed.')
+
+
+if __name__ == '__main__':
+    args = parse_args()
+    client = redis.Redis(host=args.host, port=args.port, 
decode_responses=True, password=args.password)
+    if args.flushdb:
+        client.flushdb()
+    run_test(client, PopulateCases)
+    run_test(client, AppendCases)


Reply via email to