This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 6bbdc553 refactor(replication): reduce memory copying during
incremental synchronization (#2689)
6bbdc553 is described below
commit 6bbdc553b6aaad22a380d614fcd006ec3738e4ce
Author: Rivers <[email protected]>
AuthorDate: Wed Dec 11 18:52:09 2024 +0800
refactor(replication): reduce memory copying during incremental
synchronization (#2689)
---
src/cluster/replication.cc | 56 ++++++++++++++++++++++++++--------------------
src/cluster/replication.h | 3 ++-
src/storage/storage.cc | 16 ++++++++-----
src/storage/storage.h | 6 +++--
4 files changed, 49 insertions(+), 32 deletions(-)
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index cd7fe197..b8d10907 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -30,13 +30,16 @@
#include <atomic>
#include <csignal>
#include <future>
+#include <memory>
#include <string>
+#include <string_view>
#include <thread>
#include "commands/error_constants.h"
#include "event_util.h"
#include "fmt/format.h"
#include "io_util.h"
+#include "rocksdb/write_batch.h"
#include "rocksdb_crc32c.h"
#include "scope_exit.h"
#include "server/redis_reply.h"
@@ -557,7 +560,6 @@ ReplicationThread::CBState
ReplicationThread::tryPSyncReadCB(bufferevent *bev) {
}
ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent
*bev) {
- char *bulk_data = nullptr;
repl_state_.store(kReplConnected, std::memory_order_relaxed);
auto input = bufferevent_get_input(bev);
while (true) {
@@ -576,31 +578,38 @@ ReplicationThread::CBState
ReplicationThread::incrementBatchLoopCB(bufferevent *
}
case Incr_batch_data:
// Read bulk data (batch data)
- if (incr_bulk_len_ + 2 <= evbuffer_get_length(input)) { // We got
enough data
- bulk_data = reinterpret_cast<char *>(evbuffer_pullup(input,
static_cast<ssize_t>(incr_bulk_len_ + 2)));
- std::string bulk_string = std::string(bulk_data, incr_bulk_len_);
+ if (incr_bulk_len_ + 2 > evbuffer_get_length(input)) { // If data not
enough
+ return CBState::AGAIN;
+ }
+
+ const char *bulk_data =
+ reinterpret_cast<const char *>(evbuffer_pullup(input,
static_cast<ssize_t>(incr_bulk_len_ + 2)));
+ std::string bulk_string = std::string(bulk_data, incr_bulk_len_);
+ evbuffer_drain(input, incr_bulk_len_ + 2);
+ incr_state_ = Incr_batch_size;
+
+ if (bulk_string == "ping") {
// master would send the ping heartbeat packet to check whether the
slave was alive or not,
// don't write ping to db here.
- if (bulk_string != "ping") {
- auto s = storage_->ReplicaApplyWriteBatch(std::string(bulk_data,
incr_bulk_len_));
- if (!s.IsOK()) {
- LOG(ERROR) << "[replication] CRITICAL - Failed to write batch to
local, " << s.Msg() << ". batch: 0x"
- << util::StringToHex(bulk_string);
- return CBState::RESTART;
- }
-
- s = parseWriteBatch(bulk_string);
- if (!s.IsOK()) {
- LOG(ERROR) << "[replication] CRITICAL - failed to parse write
batch 0x" << util::StringToHex(bulk_string)
- << ": " << s.Msg();
- return CBState::RESTART;
- }
- }
- evbuffer_drain(input, incr_bulk_len_ + 2);
- incr_state_ = Incr_batch_size;
- } else {
return CBState::AGAIN;
}
+
+ rocksdb::WriteBatch batch(std::move(bulk_string));
+
+ auto s = storage_->ReplicaApplyWriteBatch(&batch);
+ if (!s.IsOK()) {
+ LOG(ERROR) << "[replication] CRITICAL - Failed to write batch to
local, " << s.Msg() << ". batch: 0x"
+ << util::StringToHex(batch.Data());
+ return CBState::RESTART;
+ }
+
+ s = parseWriteBatch(batch);
+ if (!s.IsOK()) {
+ LOG(ERROR) << "[replication] CRITICAL - failed to parse write batch
0x" << util::StringToHex(batch.Data())
+ << ": " << s.Msg();
+ return CBState::RESTART;
+ }
+
break;
}
}
@@ -981,8 +990,7 @@ void ReplicationThread::TimerCB(int, int16_t) {
}
}
-Status ReplicationThread::parseWriteBatch(const std::string &batch_string) {
- rocksdb::WriteBatch write_batch(batch_string);
+Status ReplicationThread::parseWriteBatch(const rocksdb::WriteBatch
&write_batch) {
WriteBatchHandler write_batch_handler;
auto db_status = write_batch.Iterate(&write_batch_handler);
diff --git a/src/cluster/replication.h b/src/cluster/replication.h
index 8325b162..75c545e0 100644
--- a/src/cluster/replication.h
+++ b/src/cluster/replication.h
@@ -33,6 +33,7 @@
#include "event_util.h"
#include "io_util.h"
+#include "rocksdb/write_batch.h"
#include "server/redis_connection.h"
#include "status.h"
#include "storage/storage.h"
@@ -209,7 +210,7 @@ class ReplicationThread : private
EventCallbackBase<ReplicationThread> {
static bool isWrongPsyncNum(std::string_view err);
static bool isUnknownOption(std::string_view err);
- Status parseWriteBatch(const std::string &batch_string);
+ Status parseWriteBatch(const rocksdb::WriteBatch &write_batch);
};
/*
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index 8e3140c4..144c43d7 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -43,6 +43,8 @@
#include "redis_db.h"
#include "redis_metadata.h"
#include "rocksdb/cache.h"
+#include "rocksdb/options.h"
+#include "rocksdb/write_batch.h"
#include "rocksdb_crc32c.h"
#include "server/server.h"
#include "storage/batch_indexer.h"
@@ -766,22 +768,26 @@ rocksdb::Status Storage::FlushScripts(engine::Context
&ctx, const rocksdb::Write
return Write(ctx, options, batch->GetWriteBatch());
}
-Status Storage::ReplicaApplyWriteBatch(std::string &&raw_batch) {
- return ApplyWriteBatch(default_write_opts_, std::move(raw_batch));
+Status Storage::ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch) {
+ return applyWriteBatch(default_write_opts_, batch);
}
-Status Storage::ApplyWriteBatch(const rocksdb::WriteOptions &options,
std::string &&raw_batch) {
+Status Storage::applyWriteBatch(const rocksdb::WriteOptions &options,
rocksdb::WriteBatch *batch) {
if (db_size_limit_reached_) {
return {Status::NotOK, "reach space limit"};
}
- auto batch = rocksdb::WriteBatch(std::move(raw_batch));
- auto s = db_->Write(options, &batch);
+ auto s = db_->Write(options, batch);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
return Status::OK();
}
+Status Storage::ApplyWriteBatch(const rocksdb::WriteOptions &options,
std::string &&raw_batch) {
+ auto batch = rocksdb::WriteBatch(std::move(raw_batch));
+ return applyWriteBatch(options, &batch);
+}
+
void Storage::RecordStat(StatType type, uint64_t v) {
switch (type) {
case StatType::FlushCount:
diff --git a/src/storage/storage.h b/src/storage/storage.h
index b09d9ef1..a4563eab 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -41,6 +41,7 @@
#include "config/config.h"
#include "lock_manager.h"
#include "observer_or_unique.h"
+#include "rocksdb/write_batch.h"
#include "status.h"
#if defined(__sparc__) || defined(__arm__)
@@ -230,7 +231,7 @@ class Storage {
Status RestoreFromBackup();
Status RestoreFromCheckpoint();
Status GetWALIter(rocksdb::SequenceNumber seq,
std::unique_ptr<rocksdb::TransactionLogIterator> *iter);
- Status ReplicaApplyWriteBatch(std::string &&raw_batch);
+ Status ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch);
Status ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string
&&raw_batch);
rocksdb::SequenceNumber LatestSeqNumber();
@@ -380,13 +381,14 @@ class Storage {
// command, so it won't have multi transactions to be executed at the same
time.
std::unique_ptr<rocksdb::WriteBatchWithIndex> txn_write_batch_;
- rocksdb::WriteOptions default_write_opts_ = rocksdb::WriteOptions();
+ rocksdb::WriteOptions default_write_opts_;
// rocksdb used global block cache
std::shared_ptr<rocksdb::Cache> shared_block_cache_;
rocksdb::Status writeToDB(engine::Context &ctx, const rocksdb::WriteOptions
&options, rocksdb::WriteBatch *updates);
void recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_family,
const rocksdb::Status &s);
+ Status applyWriteBatch(const rocksdb::WriteOptions &options,
rocksdb::WriteBatch *batch);
};
/// Context passes fixed snapshot and batch between APIs