This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 6bbdc553 refactor(replication): reduce memory copying during 
incremental synchronization (#2689)
6bbdc553 is described below

commit 6bbdc553b6aaad22a380d614fcd006ec3738e4ce
Author: Rivers <[email protected]>
AuthorDate: Wed Dec 11 18:52:09 2024 +0800

    refactor(replication): reduce memory copying during incremental 
synchronization (#2689)
---
 src/cluster/replication.cc | 56 ++++++++++++++++++++++++++--------------------
 src/cluster/replication.h  |  3 ++-
 src/storage/storage.cc     | 16 ++++++++-----
 src/storage/storage.h      |  6 +++--
 4 files changed, 49 insertions(+), 32 deletions(-)

diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index cd7fe197..b8d10907 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -30,13 +30,16 @@
 #include <atomic>
 #include <csignal>
 #include <future>
+#include <memory>
 #include <string>
+#include <string_view>
 #include <thread>
 
 #include "commands/error_constants.h"
 #include "event_util.h"
 #include "fmt/format.h"
 #include "io_util.h"
+#include "rocksdb/write_batch.h"
 #include "rocksdb_crc32c.h"
 #include "scope_exit.h"
 #include "server/redis_reply.h"
@@ -557,7 +560,6 @@ ReplicationThread::CBState 
ReplicationThread::tryPSyncReadCB(bufferevent *bev) {
 }
 
 ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent 
*bev) {
-  char *bulk_data = nullptr;
   repl_state_.store(kReplConnected, std::memory_order_relaxed);
   auto input = bufferevent_get_input(bev);
   while (true) {
@@ -576,31 +578,38 @@ ReplicationThread::CBState 
ReplicationThread::incrementBatchLoopCB(bufferevent *
       }
       case Incr_batch_data:
         // Read bulk data (batch data)
-        if (incr_bulk_len_ + 2 <= evbuffer_get_length(input)) {  // We got 
enough data
-          bulk_data = reinterpret_cast<char *>(evbuffer_pullup(input, 
static_cast<ssize_t>(incr_bulk_len_ + 2)));
-          std::string bulk_string = std::string(bulk_data, incr_bulk_len_);
+        if (incr_bulk_len_ + 2 > evbuffer_get_length(input)) {  // If data not 
enough
+          return CBState::AGAIN;
+        }
+
+        const char *bulk_data =
+            reinterpret_cast<const char *>(evbuffer_pullup(input, 
static_cast<ssize_t>(incr_bulk_len_ + 2)));
+        std::string bulk_string = std::string(bulk_data, incr_bulk_len_);
+        evbuffer_drain(input, incr_bulk_len_ + 2);
+        incr_state_ = Incr_batch_size;
+
+        if (bulk_string == "ping") {
           // master would send the ping heartbeat packet to check whether the 
slave was alive or not,
           // don't write ping to db here.
-          if (bulk_string != "ping") {
-            auto s = storage_->ReplicaApplyWriteBatch(std::string(bulk_data, 
incr_bulk_len_));
-            if (!s.IsOK()) {
-              LOG(ERROR) << "[replication] CRITICAL - Failed to write batch to 
local, " << s.Msg() << ". batch: 0x"
-                         << util::StringToHex(bulk_string);
-              return CBState::RESTART;
-            }
-
-            s = parseWriteBatch(bulk_string);
-            if (!s.IsOK()) {
-              LOG(ERROR) << "[replication] CRITICAL - failed to parse write 
batch 0x" << util::StringToHex(bulk_string)
-                         << ": " << s.Msg();
-              return CBState::RESTART;
-            }
-          }
-          evbuffer_drain(input, incr_bulk_len_ + 2);
-          incr_state_ = Incr_batch_size;
-        } else {
           return CBState::AGAIN;
         }
+
+        rocksdb::WriteBatch batch(std::move(bulk_string));
+
+        auto s = storage_->ReplicaApplyWriteBatch(&batch);
+        if (!s.IsOK()) {
+          LOG(ERROR) << "[replication] CRITICAL - Failed to write batch to 
local, " << s.Msg() << ". batch: 0x"
+                     << util::StringToHex(batch.Data());
+          return CBState::RESTART;
+        }
+
+        s = parseWriteBatch(batch);
+        if (!s.IsOK()) {
+          LOG(ERROR) << "[replication] CRITICAL - failed to parse write batch 
0x" << util::StringToHex(batch.Data())
+                     << ": " << s.Msg();
+          return CBState::RESTART;
+        }
+
         break;
     }
   }
@@ -981,8 +990,7 @@ void ReplicationThread::TimerCB(int, int16_t) {
   }
 }
 
-Status ReplicationThread::parseWriteBatch(const std::string &batch_string) {
-  rocksdb::WriteBatch write_batch(batch_string);
+Status ReplicationThread::parseWriteBatch(const rocksdb::WriteBatch 
&write_batch) {
   WriteBatchHandler write_batch_handler;
 
   auto db_status = write_batch.Iterate(&write_batch_handler);
diff --git a/src/cluster/replication.h b/src/cluster/replication.h
index 8325b162..75c545e0 100644
--- a/src/cluster/replication.h
+++ b/src/cluster/replication.h
@@ -33,6 +33,7 @@
 
 #include "event_util.h"
 #include "io_util.h"
+#include "rocksdb/write_batch.h"
 #include "server/redis_connection.h"
 #include "status.h"
 #include "storage/storage.h"
@@ -209,7 +210,7 @@ class ReplicationThread : private 
EventCallbackBase<ReplicationThread> {
   static bool isWrongPsyncNum(std::string_view err);
   static bool isUnknownOption(std::string_view err);
 
-  Status parseWriteBatch(const std::string &batch_string);
+  Status parseWriteBatch(const rocksdb::WriteBatch &write_batch);
 };
 
 /*
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index 8e3140c4..144c43d7 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -43,6 +43,8 @@
 #include "redis_db.h"
 #include "redis_metadata.h"
 #include "rocksdb/cache.h"
+#include "rocksdb/options.h"
+#include "rocksdb/write_batch.h"
 #include "rocksdb_crc32c.h"
 #include "server/server.h"
 #include "storage/batch_indexer.h"
@@ -766,22 +768,26 @@ rocksdb::Status Storage::FlushScripts(engine::Context 
&ctx, const rocksdb::Write
   return Write(ctx, options, batch->GetWriteBatch());
 }
 
-Status Storage::ReplicaApplyWriteBatch(std::string &&raw_batch) {
-  return ApplyWriteBatch(default_write_opts_, std::move(raw_batch));
+Status Storage::ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch) {
+  return applyWriteBatch(default_write_opts_, batch);
 }
 
-Status Storage::ApplyWriteBatch(const rocksdb::WriteOptions &options, 
std::string &&raw_batch) {
+Status Storage::applyWriteBatch(const rocksdb::WriteOptions &options, 
rocksdb::WriteBatch *batch) {
   if (db_size_limit_reached_) {
     return {Status::NotOK, "reach space limit"};
   }
-  auto batch = rocksdb::WriteBatch(std::move(raw_batch));
-  auto s = db_->Write(options, &batch);
+  auto s = db_->Write(options, batch);
   if (!s.ok()) {
     return {Status::NotOK, s.ToString()};
   }
   return Status::OK();
 }
 
+Status Storage::ApplyWriteBatch(const rocksdb::WriteOptions &options, 
std::string &&raw_batch) {
+  auto batch = rocksdb::WriteBatch(std::move(raw_batch));
+  return applyWriteBatch(options, &batch);
+}
+
 void Storage::RecordStat(StatType type, uint64_t v) {
   switch (type) {
     case StatType::FlushCount:
diff --git a/src/storage/storage.h b/src/storage/storage.h
index b09d9ef1..a4563eab 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -41,6 +41,7 @@
 #include "config/config.h"
 #include "lock_manager.h"
 #include "observer_or_unique.h"
+#include "rocksdb/write_batch.h"
 #include "status.h"
 
 #if defined(__sparc__) || defined(__arm__)
@@ -230,7 +231,7 @@ class Storage {
   Status RestoreFromBackup();
   Status RestoreFromCheckpoint();
   Status GetWALIter(rocksdb::SequenceNumber seq, 
std::unique_ptr<rocksdb::TransactionLogIterator> *iter);
-  Status ReplicaApplyWriteBatch(std::string &&raw_batch);
+  Status ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch);
   Status ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string 
&&raw_batch);
   rocksdb::SequenceNumber LatestSeqNumber();
 
@@ -380,13 +381,14 @@ class Storage {
   // command, so it won't have multi transactions to be executed at the same 
time.
   std::unique_ptr<rocksdb::WriteBatchWithIndex> txn_write_batch_;
 
-  rocksdb::WriteOptions default_write_opts_ = rocksdb::WriteOptions();
+  rocksdb::WriteOptions default_write_opts_;
 
   // rocksdb used global block cache
   std::shared_ptr<rocksdb::Cache> shared_block_cache_;
 
   rocksdb::Status writeToDB(engine::Context &ctx, const rocksdb::WriteOptions 
&options, rocksdb::WriteBatch *updates);
   void recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_family, 
const rocksdb::Status &s);
+  Status applyWriteBatch(const rocksdb::WriteOptions &options, 
rocksdb::WriteBatch *batch);
 };
 
 /// Context passes fixed snapshot and batch between APIs

Reply via email to