This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 574d438b Refactor kvrocks2redis via rocksdb secondary instance (#1963)
574d438b is described below
commit 574d438b9acde7b45b0666564af4450c057aceea
Author: HaveAnOrangeCat <[email protected]>
AuthorDate: Wed Jan 3 09:55:23 2024 +0800
Refactor kvrocks2redis via rocksdb secondary instance (#1963)
---
src/common/db_util.h | 20 +++
src/storage/storage.cc | 31 +++-
src/storage/storage.h | 9 +-
utils/kvrocks2redis/config.cc | 8 -
utils/kvrocks2redis/config.h | 3 -
utils/kvrocks2redis/kvrocks2redis.conf | 4 -
utils/kvrocks2redis/main.cc | 2 +-
utils/kvrocks2redis/parser.cc | 8 +-
utils/kvrocks2redis/parser.h | 20 +--
utils/kvrocks2redis/sync.cc | 172 ++++++------------
utils/kvrocks2redis/sync.h | 8 +-
utils/kvrocks2redis/tests/README.md | 12 +-
.../kvrocks2redis/tests/append-data-to-kvrocks.py | 84 ---------
utils/kvrocks2redis/tests/check_consistency.py | 132 ++++++++++++++
utils/kvrocks2redis/tests/populate-kvrocks.py | 194 +++++++++++++++------
15 files changed, 397 insertions(+), 310 deletions(-)
diff --git a/src/common/db_util.h b/src/common/db_util.h
index a7379c61..8df34daa 100644
--- a/src/common/db_util.h
+++ b/src/common/db_util.h
@@ -62,6 +62,13 @@ StatusOr<std::unique_ptr<T>> WrapOutPtrToUnique(Args&&...
args) {
return rocksdb::DB::OpenForReadOnly(db_options, dbname, column_families,
handles, dbptr);
}
+[[nodiscard]] inline rocksdb::Status DBOpenForSecondaryInstance(
+ const rocksdb::DBOptions& db_options, const std::string& dbname, const
std::string& secondary_path,
+ const std::vector<rocksdb::ColumnFamilyDescriptor>& column_families,
+ std::vector<rocksdb::ColumnFamilyHandle*>* handles, rocksdb::DB** dbptr) {
+ return rocksdb::DB::OpenAsSecondary(db_options, dbname, secondary_path,
column_families, handles, dbptr);
+}
+
} // namespace details
inline StatusOr<std::unique_ptr<rocksdb::DB>> DBOpen(const rocksdb::Options&
options, const std::string& dbname) {
@@ -95,6 +102,19 @@ inline StatusOr<std::unique_ptr<rocksdb::DB>>
DBOpenForReadOnly(
Status::DBOpenErr>(db_options, dbname, column_families, handles);
}
+inline StatusOr<std::unique_ptr<rocksdb::DB>> DBOpenAsSecondaryInstance(
+ const rocksdb::DBOptions& db_options, const std::string& dbname, const
std::string& secondary_path,
+ const std::vector<rocksdb::ColumnFamilyDescriptor>& column_families,
+ std::vector<rocksdb::ColumnFamilyHandle*>* handles) {
+ return details::WrapOutPtrToUnique<
+ rocksdb::DB,
+ static_cast<rocksdb::Status (*)(const rocksdb::DBOptions&, const
std::string&, const std::string&,
+ const
std::vector<rocksdb::ColumnFamilyDescriptor>&,
+
std::vector<rocksdb::ColumnFamilyHandle*>*, rocksdb::DB**)>(
+ details::DBOpenForSecondaryInstance),
+ Status::DBOpenErr>(db_options, dbname, secondary_path, column_families,
handles);
+}
+
inline StatusOr<std::unique_ptr<rocksdb::BackupEngine>>
BackupEngineOpen(rocksdb::Env* db_env,
const
rocksdb::BackupEngineOptions& options) {
return details::WrapOutPtrToUnique<
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index c176db1b..c59be707 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -237,7 +237,7 @@ Status Storage::CreateColumnFamilies(const rocksdb::Options
&options) {
return Status::OK();
}
-Status Storage::Open(bool read_only) {
+Status Storage::Open(DBOpenMode mode) {
auto guard = WriteLockGuard();
db_closing_ = false;
@@ -250,7 +250,7 @@ Status Storage::Open(bool read_only) {
}
rocksdb::Options options = InitRocksDBOptions();
- if (!read_only) {
+ if (mode == kDBOpenModeDefault) {
if (auto s = CreateColumnFamilies(options); !s.IsOK()) {
return s.Prefixed("failed to create column families");
}
@@ -316,17 +316,32 @@ Status Storage::Open(bool read_only) {
if (!s.ok()) return {Status::NotOK, s.ToString()};
auto start = std::chrono::high_resolution_clock::now();
- auto dbs = read_only ? util::DBOpenForReadOnly(options, config_->db_dir,
column_families, &cf_handles_)
- : util::DBOpen(options, config_->db_dir,
column_families, &cf_handles_);
+ switch (mode) {
+ case DBOpenMode::kDBOpenModeDefault: {
+ db_ = GET_OR_RET(util::DBOpen(options, config_->db_dir, column_families,
&cf_handles_));
+ break;
+ }
+ case DBOpenMode::kDBOpenModeForReadOnly: {
+ db_ = GET_OR_RET(util::DBOpenForReadOnly(options, config_->db_dir,
column_families, &cf_handles_));
+ break;
+ }
+ case DBOpenMode::kDBOpenModeAsSecondaryInstance: {
+ db_ = GET_OR_RET(
+ util::DBOpenAsSecondaryInstance(options, config_->db_dir,
config_->dir, column_families, &cf_handles_));
+ break;
+ }
+ default:
+ __builtin_unreachable();
+ }
auto end = std::chrono::high_resolution_clock::now();
int64_t duration = std::chrono::duration_cast<std::chrono::milliseconds>(end
- start).count();
- if (!s.ok()) {
+
+ if (!db_) {
LOG(INFO) << "[storage] Failed to load the data from disk: " << duration
<< " ms";
- return {Status::DBOpenErr, s.ToString()};
+ return {Status::DBOpenErr};
}
-
- db_ = std::move(*dbs);
LOG(INFO) << "[storage] Success to load the data from disk: " << duration <<
" ms";
+
return Status::OK();
}
diff --git a/src/storage/storage.h b/src/storage/storage.h
index b0de2dd0..570ecf6c 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -29,6 +29,7 @@
#include <atomic>
#include <cinttypes>
+#include <cstddef>
#include <memory>
#include <shared_mutex>
#include <string>
@@ -51,6 +52,12 @@ enum ColumnFamilyID {
kColumnFamilyIDStream,
};
+enum DBOpenMode {
+ kDBOpenModeDefault,
+ kDBOpenModeForReadOnly,
+ kDBOpenModeAsSecondaryInstance,
+};
+
namespace engine {
constexpr const char *kPubSubColumnFamilyName = "pubsub";
@@ -100,7 +107,7 @@ class Storage {
~Storage();
void SetWriteOptions(const Config::RocksDB::WriteOptions &config);
- Status Open(bool read_only = false);
+ Status Open(DBOpenMode mode = kDBOpenModeDefault);
void CloseDB();
void EmptyDB();
rocksdb::BlockBasedTableOptions InitTableOptions();
diff --git a/utils/kvrocks2redis/config.cc b/utils/kvrocks2redis/config.cc
index 8e48f8ff..ac6f24ed 100644
--- a/utils/kvrocks2redis/config.cc
+++ b/utils/kvrocks2redis/config.cc
@@ -87,14 +87,6 @@ Status Config::parseConfigFromString(const std::string
&input) {
}
} else if (size == 1 && key == "pidfile") {
pidfile = args[0];
- } else if (size >= 2 && key == "kvrocks") {
- kvrocks_host = args[0];
- // In new versions, we don't use extra port to implement replication
- kvrocks_port =
GET_OR_RET(ParseInt<std::uint16_t>(args[1]).Prefixed("kvrocks port number"));
-
- if (size == 3) {
- kvrocks_auth = args[2];
- }
} else if (size == 1 && key == "cluster-enable") {
// Renamed to cluster-enabled, keeping the old one for compatibility.
cluster_enabled = GET_OR_RET(yesnotoi(args[0]).Prefixed("key
'cluster-enable'"));
diff --git a/utils/kvrocks2redis/config.h b/utils/kvrocks2redis/config.h
index e67cae19..a9a4bf3b 100644
--- a/utils/kvrocks2redis/config.h
+++ b/utils/kvrocks2redis/config.h
@@ -48,9 +48,6 @@ struct Config {
std::string next_offset_file_name = "last_next_offset.txt";
std::string next_seq_file_path = output_dir + "/last_next_seq.txt";
- std::string kvrocks_auth;
- std::string kvrocks_host;
- int kvrocks_port = 0;
std::map<std::string, RedisServer> tokens;
bool cluster_enabled = false;
diff --git a/utils/kvrocks2redis/kvrocks2redis.conf
b/utils/kvrocks2redis/kvrocks2redis.conf
index 33a19555..c3d1cbca 100644
--- a/utils/kvrocks2redis/kvrocks2redis.conf
+++ b/utils/kvrocks2redis/kvrocks2redis.conf
@@ -22,10 +22,6 @@ data-dir ./data
# Default: ./
output-dir ./
-# Sync kvrocks node. Use the node's Psync command to get the newest wal raw
write_batch.
-#
-# kvrocks <kvrocks_ip> <kvrocks_port> [<kvrocks_auth>]
-kvrocks 127.0.0.1 6666
# Enable cluster mode.
#
diff --git a/utils/kvrocks2redis/main.cc b/utils/kvrocks2redis/main.cc
index 767c2f19..e6ca93bf 100644
--- a/utils/kvrocks2redis/main.cc
+++ b/utils/kvrocks2redis/main.cc
@@ -122,7 +122,7 @@ int main(int argc, char *argv[]) {
kvrocks_config.slot_id_encoded = config.cluster_enabled;
engine::Storage storage(&kvrocks_config);
- s = storage.Open(true);
+ s = storage.Open(kDBOpenModeAsSecondaryInstance);
if (!s.IsOK()) {
LOG(ERROR) << "Failed to open Kvrocks storage: " << s.Msg();
exit(1);
diff --git a/utils/kvrocks2redis/parser.cc b/utils/kvrocks2redis/parser.cc
index 0c463d9a..86b3e1a5 100644
--- a/utils/kvrocks2redis/parser.cc
+++ b/utils/kvrocks2redis/parser.cc
@@ -33,14 +33,13 @@
Status Parser::ParseFullDB() {
rocksdb::DB *db = storage_->GetDB();
- if (!latest_snapshot_) latest_snapshot_ =
std::make_unique<LatestSnapShot>(db);
rocksdb::ColumnFamilyHandle *metadata_cf_handle =
storage_->GetCFHandle(engine::kMetadataColumnFamilyName);
-
+ // Due to RSI(Rocksdb Secondary Instance) not supporting "Snapshots based
read", we don't need to set the snapshot
+ // parameter. However, until we proactively invoke TryCatchUpWithPrimary,
this replica is read-only, which can be
+ // considered as a snapshot.
rocksdb::ReadOptions read_options;
- read_options.snapshot = latest_snapshot_->GetSnapShot();
read_options.fill_cache = false;
std::unique_ptr<rocksdb::Iterator> iter(db->NewIterator(read_options,
metadata_cf_handle));
-
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
Metadata metadata(kRedisNone);
auto ds = metadata.Decode(iter->value());
@@ -91,7 +90,6 @@ Status Parser::parseComplexKV(const Slice &ns_key, const
Metadata &metadata) {
std::string next_version_prefix_key = InternalKey(ns_key, "",
metadata.version + 1, slot_id_encoded_).Encode();
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
- read_options.snapshot = latest_snapshot_->GetSnapShot();
rocksdb::Slice upper_bound(next_version_prefix_key);
read_options.iterate_upper_bound = &upper_bound;
diff --git a/utils/kvrocks2redis/parser.h b/utils/kvrocks2redis/parser.h
index 69fe0f8d..09ba2f17 100644
--- a/utils/kvrocks2redis/parser.h
+++ b/utils/kvrocks2redis/parser.h
@@ -33,27 +33,10 @@
#include "storage/storage.h"
#include "writer.h"
-class LatestSnapShot {
- public:
- explicit LatestSnapShot(rocksdb::DB *db) : db_(db),
snapshot_(db_->GetSnapshot()) {}
- ~LatestSnapShot() { db_->ReleaseSnapshot(snapshot_); }
-
- LatestSnapShot(const LatestSnapShot &) = delete;
- LatestSnapShot &operator=(const LatestSnapShot &) = delete;
-
- const rocksdb::Snapshot *GetSnapShot() { return snapshot_; }
-
- private:
- rocksdb::DB *db_ = nullptr;
- const rocksdb::Snapshot *snapshot_ = nullptr;
-};
-
class Parser {
public:
explicit Parser(engine::Storage *storage, Writer *writer)
- : storage_(storage), writer_(writer),
slot_id_encoded_(storage_->IsSlotIdEncoded()) {
- latest_snapshot_ = std::make_unique<LatestSnapShot>(storage->GetDB());
- }
+ : storage_(storage), writer_(writer),
slot_id_encoded_(storage_->IsSlotIdEncoded()) {}
~Parser() = default;
Status ParseFullDB();
@@ -62,7 +45,6 @@ class Parser {
protected:
engine::Storage *storage_ = nullptr;
Writer *writer_ = nullptr;
- std::unique_ptr<LatestSnapShot> latest_snapshot_;
bool slot_id_encoded_ = false;
Status parseSimpleKV(const Slice &ns_key, const Slice &value, uint64_t
expire);
diff --git a/utils/kvrocks2redis/sync.cc b/utils/kvrocks2redis/sync.cc
index c142b942..61303a0a 100644
--- a/utils/kvrocks2redis/sync.cc
+++ b/utils/kvrocks2redis/sync.cc
@@ -43,17 +43,14 @@ Sync::Sync(engine::Storage *storage, Writer *writer, Parser
*parser, kvrocks2red
: storage_(storage), writer_(writer), parser_(parser), config_(config) {}
Sync::~Sync() {
- if (sock_fd_) close(sock_fd_);
if (next_seq_fd_) close(next_seq_fd_);
writer_->Stop();
}
/*
- * Run connect to kvrocks, and start the following steps
- * asynchronously
- * - TryPsync
- * - - if ok, IncrementBatchLoop
- * - - not, parseAllLocalStorage and restart TryPsync when done
+ * 1. Attempt to directly parse the wal.
+ * 2. If the attempt fails, then it is necessary to parse the current snapshot,
+ * After completion, repeat the steps of the first phase.
*/
void Sync::Start() {
auto s = readNextSeqFromFile(&next_seq_);
@@ -61,39 +58,16 @@ void Sync::Start() {
LOG(ERROR) << s.Msg();
return;
}
-
LOG(INFO) << "[kvrocks2redis] Start sync the data from kvrocks to redis";
while (!IsStopped()) {
- auto sock_fd = util::SockConnect(config_->kvrocks_host,
config_->kvrocks_port);
- if (!sock_fd) {
- LOG(ERROR) << fmt::format("Failed to connect to Kvrocks on {}:{}. Error:
{}", config_->kvrocks_host,
- config_->kvrocks_port, sock_fd.Msg());
- usleep(10000);
- continue;
+ s = checkWalBoundary();
+ if (!s.IsOK()) {
+ parseKVFromLocalStorage();
}
-
- sock_fd_ = *sock_fd;
- s = auth();
+ s = incrementBatchLoop();
if (!s.IsOK()) {
LOG(ERROR) << s.Msg();
- usleep(10000);
- continue;
- }
-
- while (!IsStopped()) {
- s = tryPSync();
- if (!s.IsOK()) {
- LOG(ERROR) << s.Msg();
- break;
- }
- LOG(INFO) << "[kvrocks2redis] PSync is ok, start increment batch loop";
- s = incrementBatchLoop();
- if (!s.IsOK()) {
- LOG(ERROR) << s.Msg();
- continue;
- }
}
- close(sock_fd_);
}
}
@@ -104,98 +78,68 @@ void Sync::Stop() {
LOG(INFO) << "[kvrocks2redis] Stopped";
}
-Status Sync::auth() {
- // Send auth when needed
- if (!config_->kvrocks_auth.empty()) {
- const auto auth_command = redis::MultiBulkString({"AUTH",
config_->kvrocks_auth});
- auto s = util::SockSend(sock_fd_, auth_command);
- if (!s) return s.Prefixed("send auth command err");
- std::string line = GET_OR_RET(util::SockReadLine(sock_fd_).Prefixed("read
auth response err"));
- if (line.compare(0, 3, "+OK") != 0) {
- return {Status::NotOK, "auth got invalid response"};
- }
- }
- LOG(INFO) << "[kvrocks2redis] Auth succ, continue...";
- return Status::OK();
+Status Sync::tryCatchUpWithPrimary() {
+ auto s = storage_->GetDB()->TryCatchUpWithPrimary();
+ return s.ok() ? Status() : Status::NotOK;
}
-Status Sync::tryPSync() {
- const auto seq_str = std::to_string(next_seq_);
- const auto seq_len_str = std::to_string(seq_str.length());
- const auto cmd_str = "*2" CRLF "$5" CRLF "PSYNC" CRLF "$" + seq_len_str +
CRLF + seq_str + CRLF;
- auto s = util::SockSend(sock_fd_, cmd_str);
- LOG(INFO) << "[kvrocks2redis] Try to use psync, next seq: " << next_seq_;
- if (!s) return s.Prefixed("send psync command err");
- std::string line = GET_OR_RET(util::SockReadLine(sock_fd_).Prefixed("read
psync response err"));
+Status Sync::checkWalBoundary() {
+ if (next_seq_ == storage_->LatestSeqNumber() + 1) {
+ return Status::OK();
+ }
- if (line.compare(0, 3, "+OK") != 0) {
- if (next_seq_ > 0) {
- // Ooops, Failed to psync , sync process has been terminated,
administrator should be notified
- // when full sync is needed, please remove last_next_seq config file,
and restart kvrocks2redis
- auto error_msg =
- "[kvrocks2redis] CRITICAL - Failed to psync , please remove"
- " last_next_seq config file, and restart kvrocks2redis, redis reply:
" +
- std::string(line);
- stop_flag_ = true;
- return {Status::NotOK, error_msg};
+ // Upper bound
+ if (next_seq_ > storage_->LatestSeqNumber() + 1) {
+ return {Status::NotOK};
+ }
+
+ // Lower bound
+ std::unique_ptr<rocksdb::TransactionLogIterator> iter;
+ auto s = storage_->GetWALIter(next_seq_, &iter);
+ if (s.IsOK() && iter->Valid()) {
+ auto batch = iter->GetBatch();
+ if (next_seq_ != batch.sequence) {
+ if (next_seq_ > batch.sequence) {
+ LOG(ERROR) << "checkWALBoundary with sequence: " << next_seq_
+ << ", but GetWALIter return older sequence: " <<
batch.sequence;
+ }
+ return {Status::NotOK};
}
- // PSYNC isn't OK, we should use parseAllLocalStorage
- // Switch to parseAllLocalStorage
- LOG(INFO) << "[kvrocks2redis] Failed to psync, redis reply: " <<
std::string(line);
- parseKVFromLocalStorage();
- // Restart tryPSync
- return tryPSync();
+ return Status::OK();
}
- return Status::OK();
+ return {Status::NotOK};
}
Status Sync::incrementBatchLoop() {
- std::cout << "Start parse increment batch ..." << std::endl;
- evbuffer *evbuf = evbuffer_new();
+ LOG(INFO) << "[kvrocks2redis] Start parsing increment data";
+ std::unique_ptr<rocksdb::TransactionLogIterator> iter;
while (!IsStopped()) {
- if (evbuffer_read(evbuf, sock_fd_, -1) <= 0) {
- evbuffer_free(evbuf);
- return {Status::NotOK, std::string("[kvrocks2redis] read increment batch
err: ") + strerror(errno)};
+ if (!tryCatchUpWithPrimary().IsOK()) {
+ return {Status::NotOK};
}
- if (incr_state_ == IncrementBatchLoopState::Incr_batch_size) {
- // Read bulk length
- UniqueEvbufReadln line(evbuf, EVBUFFER_EOL_CRLF_STRICT);
- if (!line) {
- usleep(10000);
- continue;
- }
- incr_bulk_len_ = line.length > 0 ? std::strtoull(line.get() + 1,
nullptr, 10) : 0;
- if (incr_bulk_len_ == 0) {
- return {Status::NotOK, "[kvrocks2redis] Invalid increment data size"};
- }
- incr_state_ = Incr_batch_data;
- }
-
- if (incr_state_ == IncrementBatchLoopState::Incr_batch_data) {
- // Read bulk data (batch data)
- if (incr_bulk_len_ + 2 <= evbuffer_get_length(evbuf)) { // We got
enough data
- char *bulk_data = reinterpret_cast<char *>(evbuffer_pullup(evbuf,
static_cast<ssize_t>(incr_bulk_len_) + 2));
- std::string bulk_data_str = std::string(bulk_data, incr_bulk_len_);
- // Skip the ping packet
- if (bulk_data_str != "ping") {
- auto bat = rocksdb::WriteBatch(bulk_data_str);
- int count = static_cast<int>(bat.Count());
- auto s = parser_->ParseWriteBatch(bulk_data_str);
- if (!s.IsOK()) {
- return s.Prefixed(fmt::format("failed to parse write batch '{}'",
util::StringToHex(bulk_data_str)));
- }
-
- s = updateNextSeq(next_seq_ + count);
- if (!s.IsOK()) {
- return s.Prefixed("failed to update next sequence");
+ if (next_seq_ <= storage_->LatestSeqNumber()) {
+ storage_->GetDB()->GetUpdatesSince(next_seq_, &iter);
+ for (; iter->Valid(); iter->Next()) {
+ auto batch = iter->GetBatch();
+ if (batch.sequence != next_seq_) {
+ if (next_seq_ > batch.sequence) {
+ LOG(ERROR) << "checkWALBoundary with sequence: " << next_seq_
+ << ", but GetWALIter return older sequence: " <<
batch.sequence;
}
+ return {Status::NotOK};
+ }
+ auto s = parser_->ParseWriteBatch(batch.writeBatchPtr->Data());
+ if (!s.IsOK()) {
+ return s.Prefixed(
+ fmt::format("failed to parse write batch '{}'",
util::StringToHex(batch.writeBatchPtr->Data())));
+ }
+ s = updateNextSeq(next_seq_ + batch.writeBatchPtr->Count());
+ if (!s.IsOK()) {
+ return s.Prefixed("failed to update next sequence");
}
- evbuffer_drain(evbuf, incr_bulk_len_ + 2);
- incr_state_ = Incr_batch_size;
- } else {
- usleep(10000);
- continue;
}
+ } else {
+ usleep(10000);
}
}
return Status::OK();
@@ -217,8 +161,8 @@ void Sync::parseKVFromLocalStorage() {
LOG(ERROR) << "[kvrocks2redis] Failed to parse full db, encounter error: "
<< s.Msg();
return;
}
-
- s = updateNextSeq(storage_->LatestSeqNumber() + 1);
+ auto last_seq = storage_->GetDB()->GetLatestSequenceNumber();
+ s = updateNextSeq(last_seq + 1);
if (!s.IsOK()) {
LOG(ERROR) << "[kvrocks2redis] Failed to update next sequence: " <<
s.Msg();
}
diff --git a/utils/kvrocks2redis/sync.h b/utils/kvrocks2redis/sync.h
index 66b9ffae..5c746b84 100644
--- a/utils/kvrocks2redis/sync.h
+++ b/utils/kvrocks2redis/sync.h
@@ -46,7 +46,6 @@ class Sync {
bool IsStopped() const { return stop_flag_; }
private:
- int sock_fd_;
bool stop_flag_ = false;
engine::Storage *storage_ = nullptr;
Writer *writer_ = nullptr;
@@ -61,12 +60,11 @@ class Sync {
Incr_batch_data,
} incr_state_ = Incr_batch_size;
- size_t incr_bulk_len_ = 0;
-
- Status auth();
- Status tryPSync();
Status incrementBatchLoop();
+ Status tryCatchUpWithPrimary();
+ Status checkWalBoundary();
+
void parseKVFromLocalStorage();
Status updateNextSeq(rocksdb::SequenceNumber seq);
diff --git a/utils/kvrocks2redis/tests/README.md
b/utils/kvrocks2redis/tests/README.md
index 17c6d687..6bc6048d 100644
--- a/utils/kvrocks2redis/tests/README.md
+++ b/utils/kvrocks2redis/tests/README.md
@@ -12,9 +12,11 @@ For testing the `kvrocks2redis` utility, manually check
generate AOF.
```bash
# populate data
-python populate-kvrocks.py
-# check generated aof file
-# append new data
-python append-data-to-kvrocks.py
-# check appended new aof data
+python3 populate-kvrocks.py [--host HOST] [--port PORT] [--password PASSWORD]
[--flushdb FLUSHDB]
+# check generated aof file & user_key.log file
+
+# check consistency
+python3 check_consistency.py [--src_host SRC_HOST] [--src_port SRC_PORT]
[--src_password SRC_PASSWORD]
+ [--dst_host DST_HOST] [--dst_port DST_PORT]
[--dst_password DST_PASSWORD]
+ [--key_file KEY_FILE]
```
diff --git a/utils/kvrocks2redis/tests/append-data-to-kvrocks.py
b/utils/kvrocks2redis/tests/append-data-to-kvrocks.py
deleted file mode 100644
index 6cfb3211..00000000
--- a/utils/kvrocks2redis/tests/append-data-to-kvrocks.py
+++ /dev/null
@@ -1,84 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import redis
-
-range=100
-factor=32
-port=6666
-
-r = redis.StrictRedis(host='localhost', port=port, db=0, password='foobared')
-
-# string
-rst = r.set('foo', 2) # update old
-assert rst
-rst = r.set('foo2', 2) # add new
-assert rst
-
-rst = r.setex('foo_ex', 7200, 2)
-assert rst
-
-# zset
-rst = r.zadd('zfoo', 4, 'd')
-assert(rst == 1)
-rst = r.zrem('zfoo', 'd')
-assert(rst == 1)
-
-# list
-rst = r.lset('lfoo', 0, 'a')
-assert(rst == 1)
-rst = r.rpush('lfoo', 'a')
-assert(rst == 5)
-rst = r.lpush('lfoo', 'b')
-assert(rst == 6)
-rst = r.lpop('lfoo')
-assert(rst == 'b')
-rst = r.rpop('lfoo')
-assert(rst == 'a')
-rst = r.ltrim('lfoo', 0, 2)
-assert rst
-
-# set
-rst = r.sadd('sfoo', 'f')
-assert(rst == 1)
-rst = r.srem('sfoo', 'f')
-assert(rst == 1)
-
-# hash
-rst = r.hset('hfoo', 'b', 2)
-assert(rst == 1)
-rst = r.hdel('hfoo', 'b')
-assert(rst == 1)
-
-# bitmap
-rst = r.setbit('bfoo', 0, 0) # update old
-assert(rst == 1)
-rst = r.setbit('bfoo', 900000, 1) # add new
-assert(rst == 0)
-
-# expire cmd
-rst = r.expire('foo', 7200)
-assert rst
-rst = r.expire('zfoo', 7200)
-assert rst
-
-# del cmd
-rst = r.delete('foo')
-assert rst
-rst = r.delete('zfoo')
-assert rst
-
diff --git a/utils/kvrocks2redis/tests/check_consistency.py
b/utils/kvrocks2redis/tests/check_consistency.py
new file mode 100644
index 00000000..3a176cc8
--- /dev/null
+++ b/utils/kvrocks2redis/tests/check_consistency.py
@@ -0,0 +1,132 @@
+#!/usr/bin/env python3
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import redis
+import codecs
+import argparse
+import time
+
+class RedisComparator:
+ def __init__(self, src_host, src_port, src_password, dst_host, dst_port,
dst_password):
+ self.src_cli = self._get_redis_client(src_host, src_port, src_password)
+ self.dst_cli = self._get_redis_client(dst_host, dst_port, dst_password)
+
+ def _get_redis_client(self, host, port, password):
+ return redis.Redis(host=host, port=port, decode_responses=True,
password=password)
+
+ def _compare_string_data(self, key):
+ src_data = self.src_cli.get(key)
+ dst_data = self.dst_cli.get(key)
+ return src_data, dst_data
+
+ def _compare_hash_data(self, key):
+ src_data = self.src_cli.hgetall(key)
+ dst_data = self.dst_cli.hgetall(key)
+ return src_data, dst_data
+
+ def _compare_list_data(self, key):
+ src_data = self.src_cli.lrange(key, 0, -1)
+ dst_data = self.dst_cli.lrange(key, 0, -1)
+ return src_data, dst_data
+
+ def _compare_set_data(self, key):
+ src_data = self.src_cli.smembers(key)
+ dst_data = self.dst_cli.smembers(key)
+ return src_data, dst_data
+
+ def _compare_zset_data(self, key):
+ src_data = self.src_cli.zrange(key, 0, -1, withscores=True)
+ dst_data = self.dst_cli.zrange(key, 0, -1, withscores=True)
+ return src_data, dst_data
+
+ def _compare_bitmap_data(self, key, pos):
+ src_data = self.src_cli.getbit(key, pos)
+ dst_data = self.dst_cli.getbit(key, pos)
+ return src_data, dst_data
+
+ def _compare_data(self, keys : list, data_type):
+ if data_type == "string":
+ return self._compare_string_data(keys[0])
+ elif data_type == "hash":
+ return self._compare_hash_data(keys[0])
+ elif data_type == "list":
+ return self._compare_list_data(keys[0])
+ elif data_type == "set":
+ return self._compare_set_data(keys[0])
+ elif data_type == "zset":
+ return self._compare_zset_data(keys[0])
+ elif data_type == 'bitmap':
+ return self._compare_bitmap_data(keys[0], keys[1])
+ elif data_type == 'none':
+ return self.src_cli.type(keys[0]), 'none'
+ else:
+ raise ValueError(f"Unsupported data type '{data_type}' for key
'{keys[0]}'")
+
+ def compare_redis_data(self, key_file=''):
+ if key_file:
+ with open(key_file, 'rb') as f:
+ for line in f:
+ keys = codecs.decode(line.strip()).split('-')
+ data_type = self.src_cli.type(keys[0])
+ src_data, dst_data = self._compare_data(keys, data_type)
+ if src_data != dst_data:
+ raise AssertionError(f"Data mismatch for key '{key}':
source data: '{src_data}' destination data: '{dst_data}'")
+
+ self._import_and_compare(100)
+ print('All tests passed.')
+
+ def _import_and_compare(self, num):
+ for i in range(num):
+ key = f'key_{i}'
+ value = f'value_{i}'
+ self.src_cli.set(key, value)
+ incr_key = 'incr_key'
+ self.src_cli.incr(incr_key)
+ hash_key = f'hash_key_{i}'
+ hash_value = {'field1': f'field1_value_{i}', 'field2':
f'field2_value_{i}'}
+ self.src_cli.hmset(hash_key, hash_value)
+ set_key = f'set_key_{i}'
+ set_value = [f'set_value_{i}_1', f'set_value_{i}_2',
f'set_value_{i}_3']
+ self.src_cli.sadd(set_key, *set_value)
+ zset_key = f'zset_key_{i}'
+ zset_value = {f'member_{i}_1': i+1, f'member_{i}_2': i+2,
f'member_{i}_3': i+3}
+ self.src_cli.zadd(zset_key, zset_value)
+ time.sleep(0.02)
+ keys = [key, incr_key, hash_key, set_key, zset_key]
+ for key in keys:
+ data_type = self.src_cli.type(key)
+ src_data, dst_data = self._compare_data([key], data_type)
+ if src_data != dst_data:
+ raise AssertionError(f"Data mismatch for key '{key}':
source data: '{src_data}' destination data: '{dst_data}'")
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(description='Redis Comparator')
+ parser.add_argument('--src_host', type=str, default='127.0.0.1',
help='Source Redis host')
+ parser.add_argument('--src_port', type=int, default=6666, help='Source
Redis port')
+ parser.add_argument('--src_password', type=str, default='foobared',
help='Source Redis password')
+ parser.add_argument('--dst_host', type=str, default='127.0.0.1',
help='Destination Redis host')
+ parser.add_argument('--dst_port', type=int, default=6379,
help='Destination Redis port')
+ parser.add_argument('--dst_password', type=str, default='',
help='Destination Redis password')
+ parser.add_argument('--key_file', type=str, help='Path to the file
containing keys to compare')
+
+ args = parser.parse_args()
+
+ redis_comparator = RedisComparator(args.src_host, args.src_port,
args.src_password, args.dst_host, args.dst_port, args.dst_password)
+ redis_comparator.compare_redis_data(args.key_file)
\ No newline at end of file
diff --git a/utils/kvrocks2redis/tests/populate-kvrocks.py
b/utils/kvrocks2redis/tests/populate-kvrocks.py
index cad3d2d4..8883a0df 100644
--- a/utils/kvrocks2redis/tests/populate-kvrocks.py
+++ b/utils/kvrocks2redis/tests/populate-kvrocks.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python3
+
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -15,57 +17,143 @@
# specific language governing permissions and limitations
# under the License.
+import time
+import argparse
import redis
-
-range=100
-factor=32
-port=6666
-
-r = redis.StrictRedis(host='localhost', port=port, db=0, password='foobared')
-
-# flushall ?
-# rst = r.flushall()
-# assert rst
-
-# string
-rst = r.set('foo', 1)
-assert rst
-
-rst = r.setex('foo_ex', 3600, 1)
-assert rst
-
-# zset
-rst = r.zadd('zfoo', 1, 'a', 2, 'b', 3, 'c')
-assert(rst == 3)
-
-# list
-rst = r.rpush('lfoo', 1, 2, 3, 4)
-assert(rst == 4)
-
-# set
-rst = r.sadd('sfoo', 'a', 'b', 'c', 'd')
-assert(rst == 4)
-
-# hash
-rst = r.hset('hfoo', 'a', 1)
-assert(rst == 1)
-
-# bitmap
-rst = r.setbit('bfoo', 0, 1)
-assert(rst == 0)
-rst = r.setbit('bfoo', 1, 1)
-assert(rst == 0)
-rst = r.setbit('bfoo', 800000, 1)
-assert(rst == 0)
-
-# expire cmd
-rst = r.expire('foo', 3600)
-assert rst
-rst = r.expire('zfoo', 3600)
-assert rst
-
-
-
-
-
-
+import sys
+
+
+filename = 'user_key.log'
+file = open(filename, 'w')
+
+PopulateCases = [
+ ('string', [
+ [('set', 'foo', 1), True],
+ [('setex', 'foo_ex', 3600, 1), True],
+ ]),
+ ('zset', [
+ [('zadd', 'zfoo', 1, 'a', 2, 'b', 3, 'c'), 3]
+ ]),
+ ('list', [
+ [('rpush', 'lfoo', 1, 2, 3, 4), 4]
+ ]),
+ ('set', [
+ [('sadd', 'sfoo', 'a', 'b', 'c', 'd'), 4]
+ ]),
+ ('hash', [
+ [('hset', 'hfoo', 'a', 1), 1]
+ ]),
+ ('bitmap', [
+ [('setbit', 'bfoo', 0, 1), 0],
+ [('setbit', 'bfoo', 1, 1), 0],
+ [('setbit', 'bfoo', 800000, 1), 0]
+ ]),
+ ('expire', [
+ [('expire', 'foo', 3600), True],
+ [('expire', 'zfoo', 3600), True]
+ ])
+]
+
+AppendCases = [
+ ('string', [
+ [('set', 'foo', 2), True],
+ [('set', 'foo2', 2), True],
+ [('setex', 'foo_ex', 7200, 2), True]
+ ]),
+ ('zset', [
+ [('zadd', 'zfoo', 4, 'd'), 1],
+ [('zrem', 'zfoo', 'd'), 1]
+ ]),
+ ('list', [
+ [('lset', 'lfoo', 0, 'a'), 1],
+ [('rpush', 'lfoo', 'a'), 5],
+ [('lpush', 'lfoo', 'b'), 6],
+ [('lpop', 'lfoo'), 'b'],
+ [('rpop', 'lfoo'), 'a'],
+ [('ltrim', 'lfoo', 0, 2), True]
+ ]),
+ ('set', [
+ [('sadd', 'sfoo', 'f'), 1],
+ [('srem', 'sfoo', 'f'), 1]
+ ]),
+ ('hash', [
+ [('hset', 'hfoo', 'b', 2), 1],
+ [('hdel', 'hfoo', 'b'), 1]
+ ]),
+ ('bitmap', [
+ [('setbit', 'bfoo', 0, 0), 1],
+ [('setbit', 'bfoo', 900000, 1), 0]
+ ]),
+ ('expire', [
+ [('expire', 'foo', 7200), True],
+ [('expire', 'zfoo', 7200), True]
+ ]),
+ ('delete', [
+ [('del', 'foo'), True],
+ [('del', 'zfoo'), True]
+ ])
+]
+
+def parse_args():
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--host', default='127.0.0.1', type=str, help='host')
+ parser.add_argument('--port', default=6666, type=int, help='port')
+ parser.add_argument('--password', default='foobared')
+ parser.add_argument('--flushdb', default=False, type=str, help='need to
flushdb')
+
+ return parser.parse_args()
+
+def check(cmd, r):
+ if len(cmd) == 1:
+ print('EXEC %s' % (str(cmd[0]),))
+ return True
+ if hasattr(cmd[1], '__call__'):
+ isPass = cmd[1](r)
+ else:
+ isPass = r == cmd[1]
+ if not isPass:
+ print('FAIL %s:%s != %s' % (str(cmd[0]), repr(r), repr(cmd[1])))
+ return False
+ return True
+
+def pipeline_execute(client, name, cmds):
+ succ = True
+ p = client.pipeline(transaction=False)
+ try:
+ for cmd in cmds:
+ if (name != 'bitmap'):
+ file.write(cmd[0][1] + '\n')
+ else:
+ file.write(f"{cmd[0][1]}-{cmd[0][2]}" + '\n')
+ p.execute_command(*cmd[0])
+ res = p.execute()
+ for i in range(0, len(cmds)):
+ if not check(cmds[i], res[i]):
+ succ = False
+ except Exception as excp:
+ succ = False
+ print('EXCP %s' % str(excp))
+ return succ
+
+
+
+def run_test(client, cases : list):
+ fails = []
+ for case in cases:
+ if not pipeline_execute(client, case[0], case[1]):
+ fails.append(case[0])
+ if len(fails) > 0:
+ print('******* Some case test fail *******')
+ for cmd in fails:
+ print(cmd)
+ else:
+ print('All case passed.')
+
+
+if __name__ == '__main__':
+ args = parse_args()
+ client = redis.Redis(host=args.host, port=args.port,
decode_responses=True, password=args.password)
+ if args.flushdb:
+ client.flushdb()
+ run_test(client, PopulateCases)
+ run_test(client, AppendCases)