This is an automated email from the ASF dual-hosted git repository.

jiashuo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit fe149ca8d82711d942583f2838bb2853c48a5455
Author: Jiashuo <[email protected]>
AuthorDate: Wed Mar 9 10:53:19 2022 +0800

    feat(dup_enhancement#18):  support batch send log on master side and handle 
it on follower side (#919)
---
 src/base/rrdb_types.cpp                            | 227 ++++++++++++++++-----
 src/idl/rrdb.thrift                                |   5 +
 src/include/rrdb/rrdb_types.h                      |  78 +++++--
 src/server/pegasus_mutation_duplicator.cpp         |  45 ++--
 src/server/pegasus_mutation_duplicator.h           |   2 +
 src/server/pegasus_write_service.cpp               | 124 ++++++-----
 .../test/pegasus_mutation_duplicator_test.cpp      |  70 +++++--
 src/server/test/pegasus_write_service_test.cpp     |  35 ++--
 8 files changed, 413 insertions(+), 173 deletions(-)

diff --git a/src/base/rrdb_types.cpp b/src/base/rrdb_types.cpp
index 4ef46f3..2d334a4 100644
--- a/src/base/rrdb_types.cpp
+++ b/src/base/rrdb_types.cpp
@@ -4933,37 +4933,154 @@ void scan_response::printTo(std::ostream &out) const
 
 duplicate_request::~duplicate_request() throw() {}
 
-void duplicate_request::__set_timestamp(const int64_t val)
+void duplicate_request::__set_entries(const std::vector<duplicate_entry> &val)
+{
+    this->entries = val;
+}
+
+uint32_t duplicate_request::read(::apache::thrift::protocol::TProtocol *iprot)
+{
+
+    apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
+    uint32_t xfer = 0;
+    std::string fname;
+    ::apache::thrift::protocol::TType ftype;
+    int16_t fid;
+
+    xfer += iprot->readStructBegin(fname);
+
+    using ::apache::thrift::protocol::TProtocolException;
+
+    while (true) {
+        xfer += iprot->readFieldBegin(fname, ftype, fid);
+        if (ftype == ::apache::thrift::protocol::T_STOP) {
+            break;
+        }
+        switch (fid) {
+        case 1:
+            if (ftype == ::apache::thrift::protocol::T_LIST) {
+                {
+                    this->entries.clear();
+                    uint32_t _size154;
+                    ::apache::thrift::protocol::TType _etype157;
+                    xfer += iprot->readListBegin(_etype157, _size154);
+                    this->entries.resize(_size154);
+                    uint32_t _i158;
+                    for (_i158 = 0; _i158 < _size154; ++_i158) {
+                        xfer += this->entries[_i158].read(iprot);
+                    }
+                    xfer += iprot->readListEnd();
+                }
+                this->__isset.entries = true;
+            } else {
+                xfer += iprot->skip(ftype);
+            }
+            break;
+        default:
+            xfer += iprot->skip(ftype);
+            break;
+        }
+        xfer += iprot->readFieldEnd();
+    }
+
+    xfer += iprot->readStructEnd();
+
+    return xfer;
+}
+
+uint32_t duplicate_request::write(::apache::thrift::protocol::TProtocol 
*oprot) const
+{
+    uint32_t xfer = 0;
+    apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
+    xfer += oprot->writeStructBegin("duplicate_request");
+
+    xfer += oprot->writeFieldBegin("entries", 
::apache::thrift::protocol::T_LIST, 1);
+    {
+        xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRUCT,
+                                      
static_cast<uint32_t>(this->entries.size()));
+        std::vector<duplicate_entry>::const_iterator _iter159;
+        for (_iter159 = this->entries.begin(); _iter159 != 
this->entries.end(); ++_iter159) {
+            xfer += (*_iter159).write(oprot);
+        }
+        xfer += oprot->writeListEnd();
+    }
+    xfer += oprot->writeFieldEnd();
+
+    xfer += oprot->writeFieldStop();
+    xfer += oprot->writeStructEnd();
+    return xfer;
+}
+
+void swap(duplicate_request &a, duplicate_request &b)
+{
+    using ::std::swap;
+    swap(a.entries, b.entries);
+    swap(a.__isset, b.__isset);
+}
+
+duplicate_request::duplicate_request(const duplicate_request &other160)
+{
+    entries = other160.entries;
+    __isset = other160.__isset;
+}
+duplicate_request::duplicate_request(duplicate_request &&other161)
+{
+    entries = std::move(other161.entries);
+    __isset = std::move(other161.__isset);
+}
+duplicate_request &duplicate_request::operator=(const duplicate_request 
&other162)
+{
+    entries = other162.entries;
+    __isset = other162.__isset;
+    return *this;
+}
+duplicate_request &duplicate_request::operator=(duplicate_request &&other163)
+{
+    entries = std::move(other163.entries);
+    __isset = std::move(other163.__isset);
+    return *this;
+}
+void duplicate_request::printTo(std::ostream &out) const
+{
+    using ::apache::thrift::to_string;
+    out << "duplicate_request(";
+    out << "entries=" << to_string(entries);
+    out << ")";
+}
+
+duplicate_entry::~duplicate_entry() throw() {}
+
+void duplicate_entry::__set_timestamp(const int64_t val)
 {
     this->timestamp = val;
     __isset.timestamp = true;
 }
 
-void duplicate_request::__set_task_code(const ::dsn::task_code &val)
+void duplicate_entry::__set_task_code(const ::dsn::task_code &val)
 {
     this->task_code = val;
     __isset.task_code = true;
 }
 
-void duplicate_request::__set_raw_message(const ::dsn::blob &val)
+void duplicate_entry::__set_raw_message(const ::dsn::blob &val)
 {
     this->raw_message = val;
     __isset.raw_message = true;
 }
 
-void duplicate_request::__set_cluster_id(const int8_t val)
+void duplicate_entry::__set_cluster_id(const int8_t val)
 {
     this->cluster_id = val;
     __isset.cluster_id = true;
 }
 
-void duplicate_request::__set_verify_timetag(const bool val)
+void duplicate_entry::__set_verify_timetag(const bool val)
 {
     this->verify_timetag = val;
     __isset.verify_timetag = true;
 }
 
-uint32_t duplicate_request::read(::apache::thrift::protocol::TProtocol *iprot)
+uint32_t duplicate_entry::read(::apache::thrift::protocol::TProtocol *iprot)
 {
 
     apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
@@ -5034,11 +5151,11 @@ uint32_t 
duplicate_request::read(::apache::thrift::protocol::TProtocol *iprot)
     return xfer;
 }
 
-uint32_t duplicate_request::write(::apache::thrift::protocol::TProtocol 
*oprot) const
+uint32_t duplicate_entry::write(::apache::thrift::protocol::TProtocol *oprot) 
const
 {
     uint32_t xfer = 0;
     apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
-    xfer += oprot->writeStructBegin("duplicate_request");
+    xfer += oprot->writeStructBegin("duplicate_entry");
 
     if (this->__isset.timestamp) {
         xfer += oprot->writeFieldBegin("timestamp", 
::apache::thrift::protocol::T_I64, 1);
@@ -5070,7 +5187,7 @@ uint32_t 
duplicate_request::write(::apache::thrift::protocol::TProtocol *oprot)
     return xfer;
 }
 
-void swap(duplicate_request &a, duplicate_request &b)
+void swap(duplicate_entry &a, duplicate_entry &b)
 {
     using ::std::swap;
     swap(a.timestamp, b.timestamp);
@@ -5081,48 +5198,48 @@ void swap(duplicate_request &a, duplicate_request &b)
     swap(a.__isset, b.__isset);
 }
 
-duplicate_request::duplicate_request(const duplicate_request &other154)
+duplicate_entry::duplicate_entry(const duplicate_entry &other164)
 {
-    timestamp = other154.timestamp;
-    task_code = other154.task_code;
-    raw_message = other154.raw_message;
-    cluster_id = other154.cluster_id;
-    verify_timetag = other154.verify_timetag;
-    __isset = other154.__isset;
+    timestamp = other164.timestamp;
+    task_code = other164.task_code;
+    raw_message = other164.raw_message;
+    cluster_id = other164.cluster_id;
+    verify_timetag = other164.verify_timetag;
+    __isset = other164.__isset;
 }
-duplicate_request::duplicate_request(duplicate_request &&other155)
+duplicate_entry::duplicate_entry(duplicate_entry &&other165)
 {
-    timestamp = std::move(other155.timestamp);
-    task_code = std::move(other155.task_code);
-    raw_message = std::move(other155.raw_message);
-    cluster_id = std::move(other155.cluster_id);
-    verify_timetag = std::move(other155.verify_timetag);
-    __isset = std::move(other155.__isset);
+    timestamp = std::move(other165.timestamp);
+    task_code = std::move(other165.task_code);
+    raw_message = std::move(other165.raw_message);
+    cluster_id = std::move(other165.cluster_id);
+    verify_timetag = std::move(other165.verify_timetag);
+    __isset = std::move(other165.__isset);
 }
-duplicate_request &duplicate_request::operator=(const duplicate_request 
&other156)
+duplicate_entry &duplicate_entry::operator=(const duplicate_entry &other166)
 {
-    timestamp = other156.timestamp;
-    task_code = other156.task_code;
-    raw_message = other156.raw_message;
-    cluster_id = other156.cluster_id;
-    verify_timetag = other156.verify_timetag;
-    __isset = other156.__isset;
+    timestamp = other166.timestamp;
+    task_code = other166.task_code;
+    raw_message = other166.raw_message;
+    cluster_id = other166.cluster_id;
+    verify_timetag = other166.verify_timetag;
+    __isset = other166.__isset;
     return *this;
 }
-duplicate_request &duplicate_request::operator=(duplicate_request &&other157)
+duplicate_entry &duplicate_entry::operator=(duplicate_entry &&other167)
 {
-    timestamp = std::move(other157.timestamp);
-    task_code = std::move(other157.task_code);
-    raw_message = std::move(other157.raw_message);
-    cluster_id = std::move(other157.cluster_id);
-    verify_timetag = std::move(other157.verify_timetag);
-    __isset = std::move(other157.__isset);
+    timestamp = std::move(other167.timestamp);
+    task_code = std::move(other167.task_code);
+    raw_message = std::move(other167.raw_message);
+    cluster_id = std::move(other167.cluster_id);
+    verify_timetag = std::move(other167.verify_timetag);
+    __isset = std::move(other167.__isset);
     return *this;
 }
-void duplicate_request::printTo(std::ostream &out) const
+void duplicate_entry::printTo(std::ostream &out) const
 {
     using ::apache::thrift::to_string;
-    out << "duplicate_request(";
+    out << "duplicate_entry(";
     out << "timestamp=";
     (__isset.timestamp ? (out << to_string(timestamp)) : (out << "<null>"));
     out << ", "
@@ -5230,30 +5347,30 @@ void swap(duplicate_response &a, duplicate_response &b)
     swap(a.__isset, b.__isset);
 }
 
-duplicate_response::duplicate_response(const duplicate_response &other158)
+duplicate_response::duplicate_response(const duplicate_response &other168)
 {
-    error = other158.error;
-    error_hint = other158.error_hint;
-    __isset = other158.__isset;
+    error = other168.error;
+    error_hint = other168.error_hint;
+    __isset = other168.__isset;
 }
-duplicate_response::duplicate_response(duplicate_response &&other159)
+duplicate_response::duplicate_response(duplicate_response &&other169)
 {
-    error = std::move(other159.error);
-    error_hint = std::move(other159.error_hint);
-    __isset = std::move(other159.__isset);
+    error = std::move(other169.error);
+    error_hint = std::move(other169.error_hint);
+    __isset = std::move(other169.__isset);
 }
-duplicate_response &duplicate_response::operator=(const duplicate_response 
&other160)
+duplicate_response &duplicate_response::operator=(const duplicate_response 
&other170)
 {
-    error = other160.error;
-    error_hint = other160.error_hint;
-    __isset = other160.__isset;
+    error = other170.error;
+    error_hint = other170.error_hint;
+    __isset = other170.__isset;
     return *this;
 }
-duplicate_response &duplicate_response::operator=(duplicate_response 
&&other161)
+duplicate_response &duplicate_response::operator=(duplicate_response 
&&other171)
 {
-    error = std::move(other161.error);
-    error_hint = std::move(other161.error_hint);
-    __isset = std::move(other161.__isset);
+    error = std::move(other171.error);
+    error_hint = std::move(other171.error_hint);
+    __isset = std::move(other171.__isset);
     return *this;
 }
 void duplicate_response::printTo(std::ostream &out) const
diff --git a/src/idl/rrdb.thrift b/src/idl/rrdb.thrift
index 99316d2..3fc8280 100644
--- a/src/idl/rrdb.thrift
+++ b/src/idl/rrdb.thrift
@@ -298,6 +298,11 @@ struct scan_response
 
 struct duplicate_request
 {
+    1: list<duplicate_entry> entries
+}
+
+struct duplicate_entry
+{
     // The timestamp of this write.
     1: optional i64 timestamp
 
diff --git a/src/include/rrdb/rrdb_types.h b/src/include/rrdb/rrdb_types.h
index 0e36678..7a75c28 100644
--- a/src/include/rrdb/rrdb_types.h
+++ b/src/include/rrdb/rrdb_types.h
@@ -123,6 +123,8 @@ class scan_response;
 
 class duplicate_request;
 
+class duplicate_entry;
+
 class duplicate_response;
 
 typedef struct _update_request__isset
@@ -2074,7 +2076,53 @@ inline std::ostream &operator<<(std::ostream &out, const 
scan_response &obj)
 
 typedef struct _duplicate_request__isset
 {
-    _duplicate_request__isset()
+    _duplicate_request__isset() : entries(false) {}
+    bool entries : 1;
+} _duplicate_request__isset;
+
+class duplicate_request
+{
+public:
+    duplicate_request(const duplicate_request &);
+    duplicate_request(duplicate_request &&);
+    duplicate_request &operator=(const duplicate_request &);
+    duplicate_request &operator=(duplicate_request &&);
+    duplicate_request() {}
+
+    virtual ~duplicate_request() throw();
+    std::vector<duplicate_entry> entries;
+
+    _duplicate_request__isset __isset;
+
+    void __set_entries(const std::vector<duplicate_entry> &val);
+
+    bool operator==(const duplicate_request &rhs) const
+    {
+        if (!(entries == rhs.entries))
+            return false;
+        return true;
+    }
+    bool operator!=(const duplicate_request &rhs) const { return !(*this == 
rhs); }
+
+    bool operator<(const duplicate_request &) const;
+
+    uint32_t read(::apache::thrift::protocol::TProtocol *iprot);
+    uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const;
+
+    virtual void printTo(std::ostream &out) const;
+};
+
+void swap(duplicate_request &a, duplicate_request &b);
+
+inline std::ostream &operator<<(std::ostream &out, const duplicate_request 
&obj)
+{
+    obj.printTo(out);
+    return out;
+}
+
+typedef struct _duplicate_entry__isset
+{
+    _duplicate_entry__isset()
         : timestamp(false),
           task_code(false),
           raw_message(false),
@@ -2087,25 +2135,25 @@ typedef struct _duplicate_request__isset
     bool raw_message : 1;
     bool cluster_id : 1;
     bool verify_timetag : 1;
-} _duplicate_request__isset;
+} _duplicate_entry__isset;
 
-class duplicate_request
+class duplicate_entry
 {
 public:
-    duplicate_request(const duplicate_request &);
-    duplicate_request(duplicate_request &&);
-    duplicate_request &operator=(const duplicate_request &);
-    duplicate_request &operator=(duplicate_request &&);
-    duplicate_request() : timestamp(0), cluster_id(0), verify_timetag(0) {}
+    duplicate_entry(const duplicate_entry &);
+    duplicate_entry(duplicate_entry &&);
+    duplicate_entry &operator=(const duplicate_entry &);
+    duplicate_entry &operator=(duplicate_entry &&);
+    duplicate_entry() : timestamp(0), cluster_id(0), verify_timetag(0) {}
 
-    virtual ~duplicate_request() throw();
+    virtual ~duplicate_entry() throw();
     int64_t timestamp;
     ::dsn::task_code task_code;
     ::dsn::blob raw_message;
     int8_t cluster_id;
     bool verify_timetag;
 
-    _duplicate_request__isset __isset;
+    _duplicate_entry__isset __isset;
 
     void __set_timestamp(const int64_t val);
 
@@ -2117,7 +2165,7 @@ public:
 
     void __set_verify_timetag(const bool val);
 
-    bool operator==(const duplicate_request &rhs) const
+    bool operator==(const duplicate_entry &rhs) const
     {
         if (__isset.timestamp != rhs.__isset.timestamp)
             return false;
@@ -2141,9 +2189,9 @@ public:
             return false;
         return true;
     }
-    bool operator!=(const duplicate_request &rhs) const { return !(*this == 
rhs); }
+    bool operator!=(const duplicate_entry &rhs) const { return !(*this == 
rhs); }
 
-    bool operator<(const duplicate_request &) const;
+    bool operator<(const duplicate_entry &) const;
 
     uint32_t read(::apache::thrift::protocol::TProtocol *iprot);
     uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const;
@@ -2151,9 +2199,9 @@ public:
     virtual void printTo(std::ostream &out) const;
 };
 
-void swap(duplicate_request &a, duplicate_request &b);
+void swap(duplicate_entry &a, duplicate_entry &b);
 
-inline std::ostream &operator<<(std::ostream &out, const duplicate_request 
&obj)
+inline std::ostream &operator<<(std::ostream &out, const duplicate_entry &obj)
 {
     obj.printTo(out);
     return out;
diff --git a/src/server/pegasus_mutation_duplicator.cpp 
b/src/server/pegasus_mutation_duplicator.cpp
index 7244f4c..6fc4ecf 100644
--- a/src/server/pegasus_mutation_duplicator.cpp
+++ b/src/server/pegasus_mutation_duplicator.cpp
@@ -42,6 +42,11 @@ namespace replication {
 namespace pegasus {
 namespace server {
 
+DSN_DEFINE_uint32("pegasus",
+                  duplicate_log_batch_megabytes,
+                  4,
+                  "send mutation log batch size per rpc");
+
 using namespace dsn::literals::chrono_literals;
 
 /*extern*/ uint64_t get_hash_from_request(dsn::task_code tc, const dsn::blob 
&data)
@@ -146,9 +151,9 @@ void 
pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash,
         // errors are acceptable.
         // TODO(wutao1): print the entire request for future debugging.
         if (dsn::rand::next_double01() <= 0.01) {
-            derror_replica("duplicate_rpc failed: {} [timestamp:{}]",
+            derror_replica("duplicate_rpc failed: {} [size:{}]",
                            err == dsn::ERR_OK ? 
_client->get_error_string(perr) : err.to_string(),
-                           rpc.request().timestamp);
+                           rpc.request().entries.size());
         }
         // duplicating an illegal write to server is unacceptable, fail fast.
         dassert_replica(perr != PERR_INVALID_ARGUMENT, 
rpc.response().error_hint);
@@ -184,9 +189,14 @@ void 
pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb
 {
     _total_shipped_size = 0;
 
+    auto batch_request = dsn::make_unique<dsn::apps::duplicate_request>();
+    uint batch_bytes = 0;
+    int cur_count = 0;
+
     for (auto mut : muts) {
         // mut: 0=timestamp, 1=rpc_code, 2=raw_message
 
+        cur_count++;
         dsn::task_code rpc_code = std::get<1>(mut);
         dsn::blob raw_message = std::get<2>(mut);
         auto dreq = dsn::make_unique<dsn::apps::duplicate_request>();
@@ -197,18 +207,29 @@ void 
pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb
             // destinations. A DUPLICATE is meant to be targeting only one 
cluster.
             continue;
         } else {
-            dreq->__set_raw_message(raw_message);
-            dreq->__set_task_code(rpc_code);
-            dreq->__set_timestamp(std::get<0>(mut));
-            dreq->__set_cluster_id(get_current_cluster_id());
+            dsn::apps::duplicate_entry entry;
+            entry.__set_raw_message(raw_message);
+            entry.__set_task_code(rpc_code);
+            entry.__set_timestamp(std::get<0>(mut));
+            entry.__set_cluster_id(get_current_cluster_id());
+            batch_request->entries.emplace_back(std::move(entry));
+            batch_bytes += raw_message.length();
         }
 
-        uint64_t hash = get_hash_from_request(rpc_code, raw_message);
-        duplicate_rpc rpc(std::move(dreq),
-                          dsn::apps::RPC_RRDB_RRDB_DUPLICATE,
-                          10_s, // TODO(wutao1): configurable timeout.
-                          hash);
-        _inflights[hash].push_back(std::move(rpc));
+        if (batch_bytes >= (FLAGS_duplicate_log_batch_megabytes << 20) ||
+            cur_count == muts.size()) {
+            // since all the plog's mutations of replica belong to same gpid 
though the hash of
+            // mutation is different, use the last mutation of one batch to 
get and represents the
+            // current hash value, it will still send to remote correct replica
+            uint64_t hash = get_hash_from_request(rpc_code, raw_message);
+            duplicate_rpc rpc(std::move(batch_request),
+                              dsn::apps::RPC_RRDB_RRDB_DUPLICATE,
+                              100_s, // TODO(wutao1): configurable timeout.
+                              hash);
+            _inflights[hash].push_back(std::move(rpc));
+            batch_request = dsn::make_unique<dsn::apps::duplicate_request>();
+            batch_bytes = 0;
+        }
     }
 
     if (_inflights.empty()) {
diff --git a/src/server/pegasus_mutation_duplicator.h 
b/src/server/pegasus_mutation_duplicator.h
index 4d0b175..daeaf34 100644
--- a/src/server/pegasus_mutation_duplicator.h
+++ b/src/server/pegasus_mutation_duplicator.h
@@ -22,11 +22,13 @@
 #include <dsn/dist/replication/mutation_duplicator.h>
 #include <dsn/dist/replication/replica_base.h>
 #include <rrdb/rrdb.code.definition.h>
+#include <dsn/utility/flags.h>
 
 #include "client_lib/pegasus_client_factory_impl.h"
 
 namespace pegasus {
 namespace server {
+DSN_DECLARE_uint32(duplicate_log_batch_megabytes);
 
 using namespace dsn::literals::chrono_literals;
 
diff --git a/src/server/pegasus_write_service.cpp 
b/src/server/pegasus_write_service.cpp
index 48eaeb7..1a13753 100644
--- a/src/server/pegasus_write_service.cpp
+++ b/src/server/pegasus_write_service.cpp
@@ -308,69 +308,83 @@ void pegasus_write_service::clear_up_batch_states()
 }
 
 int pegasus_write_service::duplicate(int64_t decree,
-                                     const dsn::apps::duplicate_request 
&request,
+                                     const dsn::apps::duplicate_request 
&requests,
                                      dsn::apps::duplicate_response &resp)
 {
     // Verifies the cluster_id.
-    if (!dsn::replication::is_cluster_id_configured(request.cluster_id)) {
-        resp.__set_error(rocksdb::Status::kInvalidArgument);
-        resp.__set_error_hint("request cluster id is unconfigured");
-        return empty_put(decree);
-    }
-    if (request.cluster_id == get_current_cluster_id()) {
-        resp.__set_error(rocksdb::Status::kInvalidArgument);
-        resp.__set_error_hint("self-duplicating");
-        return empty_put(decree);
-    }
-
-    _pfc_duplicate_qps->increment();
-    auto cleanup = dsn::defer([this, &request]() {
-        uint64_t latency_ms = (dsn_now_us() - request.timestamp) / 1000;
-        if (latency_ms > _dup_lagging_write_threshold_ms) {
-            _pfc_dup_lagging_writes->increment();
+    for (const auto &request : requests.entries) {
+        if (!dsn::replication::is_cluster_id_configured(request.cluster_id)) {
+            resp.__set_error(rocksdb::Status::kInvalidArgument);
+            resp.__set_error_hint("request cluster id is unconfigured");
+            return empty_put(decree);
         }
-        _pfc_dup_time_lag->set(latency_ms);
-    });
-    dsn::message_ex *write = dsn::from_blob_to_received_msg(request.task_code, 
request.raw_message);
-    bool is_delete = request.task_code == 
dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE ||
-                     request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE;
-    auto remote_timetag = generate_timetag(request.timestamp, 
request.cluster_id, is_delete);
-    auto ctx = db_write_context::create_duplicate(decree, remote_timetag, 
request.verify_timetag);
-
-    if (request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) {
-        multi_put_rpc rpc(write);
-        resp.__set_error(_impl->multi_put(ctx, rpc.request(), rpc.response()));
-        return resp.error;
-    }
-    if (request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) {
-        multi_remove_rpc rpc(write);
-        resp.__set_error(_impl->multi_remove(ctx.decree, rpc.request(), 
rpc.response()));
-        return resp.error;
-    }
-    put_rpc put;
-    remove_rpc remove;
-    if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT ||
-        request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
-        int err = 0;
-        if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
-            put = put_rpc(write);
-            err = _impl->batch_put(ctx, put.request(), put.response());
+        if (request.cluster_id == get_current_cluster_id()) {
+            resp.__set_error(rocksdb::Status::kInvalidArgument);
+            resp.__set_error_hint("self-duplicating");
+            return empty_put(decree);
         }
-        if (request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
-            remove = remove_rpc(write);
-            err = _impl->batch_remove(ctx.decree, remove.request(), 
remove.response());
+
+        _pfc_duplicate_qps->increment();
+        auto cleanup = dsn::defer([this, &request]() {
+            uint64_t latency_ms = (dsn_now_us() - request.timestamp) / 1000;
+            if (latency_ms > _dup_lagging_write_threshold_ms) {
+                _pfc_dup_lagging_writes->increment();
+            }
+            _pfc_dup_time_lag->set(latency_ms);
+        });
+        dsn::message_ex *write =
+            dsn::from_blob_to_received_msg(request.task_code, 
request.raw_message);
+        bool is_delete = request.task_code == 
dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE ||
+                         request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE;
+        auto remote_timetag = generate_timetag(request.timestamp, 
request.cluster_id, is_delete);
+        auto ctx =
+            db_write_context::create_duplicate(decree, remote_timetag, 
request.verify_timetag);
+
+        if (request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) {
+            multi_put_rpc rpc(write);
+            resp.__set_error(_impl->multi_put(ctx, rpc.request(), 
rpc.response()));
+            if (resp.error != rocksdb::Status::kOk) {
+                return resp.error;
+            }
+            continue;
         }
-        if (!err) {
-            err = _impl->batch_commit(ctx.decree);
-        } else {
-            _impl->batch_abort(ctx.decree, err);
+        if (request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) {
+            multi_remove_rpc rpc(write);
+            resp.__set_error(_impl->multi_remove(ctx.decree, rpc.request(), 
rpc.response()));
+            if (resp.error != rocksdb::Status::kOk) {
+                return resp.error;
+            }
+            continue;
         }
-        resp.__set_error(err);
-        return resp.error;
+        put_rpc put;
+        remove_rpc remove;
+        if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT ||
+            request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
+            int err = 0;
+            if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
+                put = put_rpc(write);
+                err = _impl->batch_put(ctx, put.request(), put.response());
+            }
+            if (request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
+                remove = remove_rpc(write);
+                err = _impl->batch_remove(ctx.decree, remove.request(), 
remove.response());
+            }
+            if (!err) {
+                err = _impl->batch_commit(ctx.decree);
+            } else {
+                _impl->batch_abort(ctx.decree, err);
+            }
+            resp.__set_error(err);
+            if (resp.error != rocksdb::Status::kOk) {
+                return resp.error;
+            }
+            continue;
+        }
+        resp.__set_error(rocksdb::Status::kInvalidArgument);
+        resp.__set_error_hint(fmt::format("unrecognized task code {}", 
request.task_code));
+        return empty_put(ctx.decree);
     }
-    resp.__set_error(rocksdb::Status::kInvalidArgument);
-    resp.__set_error_hint(fmt::format("unrecognized task code {}", 
request.task_code));
-    return empty_put(ctx.decree);
+    return resp.error;
 }
 
 int pegasus_write_service::ingest_files(int64_t decree,
diff --git a/src/server/test/pegasus_mutation_duplicator_test.cpp 
b/src/server/test/pegasus_mutation_duplicator_test.cpp
index 2dab5a4..f61fe69 100644
--- a/src/server/test/pegasus_mutation_duplicator_test.cpp
+++ b/src/server/test/pegasus_mutation_duplicator_test.cpp
@@ -19,6 +19,7 @@
 
 #include "server/pegasus_mutation_duplicator.h"
 #include "base/pegasus_rpc_types.h"
+#include "base/value_schema_manager.h"
 #include "pegasus_server_test_base.h"
 
 #include <gtest/gtest.h>
@@ -49,19 +50,27 @@ public:
         auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp");
         duplicator->set_task_environment(&_env);
 
+        std::string sort_key;
+        for (int i = 0; i < 1000; i++) {
+            sort_key = fmt::format("{}_{}", sort_key, i);
+        }
+
         mutation_tuple_set muts;
-        for (uint64_t i = 0; i < 100; i++) {
+        uint total_bytes = 0;
+        for (uint64_t i = 0; i < 4000; i++) {
             uint64_t ts = 200 + i;
             dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT;
 
             dsn::apps::update_request request;
-            pegasus::pegasus_generate_key(request.key, std::string("hash"), 
std::string("sort"));
+            pegasus::pegasus_generate_key(request.key, std::string("hash"), 
sort_key);
             dsn::message_ptr msg =
                 dsn::from_thrift_request_to_received_message(request, 
dsn::apps::RPC_RRDB_RRDB_PUT);
             auto data = dsn::move_message_to_blob(msg.get());
 
             muts.insert(std::make_tuple(ts, code, data));
+            total_bytes += data.length();
         }
+        auto batch_count = total_bytes / (FLAGS_duplicate_log_batch_megabytes 
<< 20) + 1;
 
         size_t total_shipped_size = 0;
         auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator 
*>(duplicator.get());
@@ -69,14 +78,13 @@ public:
         {
             duplicator->duplicate(muts, [](size_t) {});
 
-            size_t total_size = 100;
-            while (total_size > 0) {
+            while (batch_count > 0) {
                 // ensure mutations having the same hash are sending 
sequentially.
                 ASSERT_EQ(duplicator_impl->_inflights.size(), 1);
                 ASSERT_EQ(duplicate_rpc::mail_box().size(), 1);
 
-                total_size--;
-                ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 
total_size);
+                batch_count--;
+                ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 
batch_count);
 
                 auto rpc = duplicate_rpc::mail_box().back();
                 duplicate_rpc::mail_box().pop_back();
@@ -106,19 +114,27 @@ public:
         auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp");
         duplicator->set_task_environment(&_env);
 
+        std::string sort_key;
+        for (int i = 0; i < 1000; i++) {
+            sort_key = fmt::format("{}_{}", sort_key, i);
+        }
+
         mutation_tuple_set muts;
-        for (uint64_t i = 0; i < 10; i++) {
+        uint total_bytes = 0;
+        for (uint64_t i = 0; i < 4000; i++) {
             uint64_t ts = 200 + i;
             dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT;
 
             dsn::apps::update_request request;
-            pegasus::pegasus_generate_key(request.key, std::string("hash"), 
std::string("sort"));
+            pegasus::pegasus_generate_key(request.key, std::string("hash"), 
sort_key);
             dsn::message_ptr msg =
                 dsn::from_thrift_request_to_received_message(request, 
dsn::apps::RPC_RRDB_RRDB_PUT);
             auto data = dsn::move_message_to_blob(msg.get());
 
             muts.insert(std::make_tuple(ts, code, data));
+            total_bytes += data.length();
         }
+        auto batch_count = total_bytes / (FLAGS_duplicate_log_batch_megabytes 
<< 20) + 1;
 
         auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator 
*>(duplicator.get());
         RPC_MOCKING(duplicate_rpc)
@@ -127,7 +143,7 @@ public:
 
             auto rpc = duplicate_rpc::mail_box().back();
             duplicate_rpc::mail_box().pop_back();
-            ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9);
+            ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 
batch_count - 1);
 
             // failed
             duplicator_impl->on_duplicate_reply(
@@ -139,7 +155,7 @@ public:
             // retry infinitely
             ASSERT_EQ(duplicator_impl->_inflights.size(), 1);
             ASSERT_EQ(duplicate_rpc::mail_box().size(), 1);
-            ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9);
+            ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 
batch_count - 1);
             duplicate_rpc::mail_box().clear();
 
             // with other error
@@ -148,7 +164,7 @@ public:
             _tracker.wait_outstanding_tasks();
             ASSERT_EQ(duplicator_impl->_inflights.size(), 1);
             ASSERT_EQ(duplicate_rpc::mail_box().size(), 1);
-            ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9);
+            ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 
batch_count - 1);
             duplicate_rpc::mail_box().clear();
 
             // with other error
@@ -158,7 +174,7 @@ public:
             _tracker.wait_outstanding_tasks();
             ASSERT_EQ(duplicator_impl->_inflights.size(), 1);
             ASSERT_EQ(duplicate_rpc::mail_box().size(), 1);
-            ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9);
+            ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 
batch_count - 1);
             duplicate_rpc::mail_box().clear();
         }
     }
@@ -169,30 +185,38 @@ public:
         auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp");
         duplicator->set_task_environment(&_env);
 
-        size_t total_size = 3000;
+        size_t total_size = 4000;
+        std::string sort_key;
+        for (int i = 0; i < 1000; i++) {
+            sort_key = fmt::format("{}_{}", sort_key, i);
+        }
+
         mutation_tuple_set muts;
+        uint total_bytes = 0;
         for (uint64_t i = 0; i < total_size; i++) {
             uint64_t ts = 200 + i;
             dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT;
 
             dsn::apps::update_request request;
             pegasus::pegasus_generate_key(
-                request.key, std::string("hash") + std::to_string(i), 
std::string("sort"));
+                request.key, std::string("hash") + std::to_string(i), 
sort_key);
             dsn::message_ptr msg = 
dsn::from_thrift_request_to_received_message(request, code);
             auto data = dsn::move_message_to_blob(msg.get());
 
             muts.insert(std::make_tuple(ts, code, data));
+            total_bytes += data.length();
         }
+        auto batch_count = total_bytes / (FLAGS_duplicate_log_batch_megabytes 
<< 20) + 1;
 
         auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator 
*>(duplicator.get());
         RPC_MOCKING(duplicate_rpc)
         {
             duplicator->duplicate(muts, [](size_t) {});
 
-            // ensure each bucket has only 1 request and each request is
+            // ensure each bucket has only 1 batch request and each request is
             // isolated with others.
-            ASSERT_EQ(duplicator_impl->_inflights.size(), total_size);
-            ASSERT_EQ(duplicate_rpc::mail_box().size(), total_size);
+            ASSERT_EQ(duplicator_impl->_inflights.size(), batch_count);
+            ASSERT_EQ(duplicate_rpc::mail_box().size(), batch_count);
             for (const auto &ents : duplicator_impl->_inflights) {
                 ASSERT_EQ(ents.second.size(), 0);
             }
@@ -223,7 +247,9 @@ public:
 private:
     static uint64_t get_hash(const duplicate_rpc &rpc)
     {
-        return get_hash_from_request(rpc.request().task_code, 
rpc.request().raw_message);
+        auto size = rpc.request().entries.size();
+        return get_hash_from_request(rpc.request().entries[size - 1].task_code,
+                                     rpc.request().entries[size - 
1].raw_message);
     }
 };
 
@@ -322,9 +348,11 @@ TEST_F(pegasus_mutation_duplicator_test, 
duplicate_duplicate)
 
     // a duplicate from onebox2
     dsn::apps::duplicate_request dup;
-    dup.cluster_id = 2;
-    dup.raw_message = data;
-    dup.timestamp = 200;
+    dsn::apps::duplicate_entry entry;
+    entry.cluster_id = 2;
+    entry.raw_message = data;
+    entry.timestamp = 200;
+    dup.entries.emplace_back(entry);
     msg = dsn::from_thrift_request_to_received_message(dup, 
dsn::apps::RPC_RRDB_RRDB_DUPLICATE);
     data = dsn::move_message_to_blob(msg.get());
 
diff --git a/src/server/test/pegasus_write_service_test.cpp 
b/src/server/test/pegasus_write_service_test.cpp
index 839da27..9bd3d7f 100644
--- a/src/server/test/pegasus_write_service_test.cpp
+++ b/src/server/test/pegasus_write_service_test.cpp
@@ -220,8 +220,9 @@ TEST_F(pegasus_write_service_test, duplicate_not_batched)
     }
 
     dsn::apps::duplicate_request duplicate;
-    duplicate.timestamp = 1000;
-    duplicate.cluster_id = 2;
+    dsn::apps::duplicate_entry entry;
+    entry.timestamp = 1000;
+    entry.cluster_id = 2;
     dsn::apps::duplicate_response resp;
 
     {
@@ -233,9 +234,9 @@ TEST_F(pegasus_write_service_test, duplicate_not_batched)
         }
         dsn::message_ptr mput_msg = pegasus::create_multi_put_request(mput);
 
-        duplicate.task_code = dsn::apps::RPC_RRDB_RRDB_MULTI_PUT;
-        duplicate.raw_message = dsn::move_message_to_blob(mput_msg.get());
-
+        entry.task_code = dsn::apps::RPC_RRDB_RRDB_MULTI_PUT;
+        entry.raw_message = dsn::move_message_to_blob(mput_msg.get());
+        duplicate.entries.emplace_back(entry);
         _write_svc->duplicate(1, duplicate, resp);
         ASSERT_EQ(resp.error, 0);
     }
@@ -248,8 +249,8 @@ TEST_F(pegasus_write_service_test, duplicate_not_batched)
         }
         dsn::message_ptr mremove_msg = 
pegasus::create_multi_remove_request(mremove);
 
-        duplicate.task_code = dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE;
-        duplicate.raw_message = dsn::move_message_to_blob(mremove_msg.get());
+        entry.task_code = dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE;
+        entry.raw_message = dsn::move_message_to_blob(mremove_msg.get());
 
         _write_svc->duplicate(1, duplicate, resp);
         ASSERT_EQ(resp.error, 0);
@@ -270,8 +271,9 @@ TEST_F(pegasus_write_service_test, duplicate_batched)
 
     {
         dsn::apps::duplicate_request duplicate;
-        duplicate.timestamp = 1000;
-        duplicate.cluster_id = 2;
+        dsn::apps::duplicate_entry entry;
+        entry.timestamp = 1000;
+        entry.cluster_id = 2;
         dsn::apps::duplicate_response resp;
 
         for (int i = 0; i < kv_num; i++) {
@@ -280,8 +282,9 @@ TEST_F(pegasus_write_service_test, duplicate_batched)
             request.value.assign(value[i].data(), 0, value[i].size());
 
             dsn::message_ptr msg_ptr = pegasus::create_put_request(request);
-            duplicate.raw_message = dsn::move_message_to_blob(msg_ptr.get());
-            duplicate.task_code = dsn::apps::RPC_RRDB_RRDB_PUT;
+            entry.raw_message = dsn::move_message_to_blob(msg_ptr.get());
+            entry.task_code = dsn::apps::RPC_RRDB_RRDB_PUT;
+            duplicate.entries.emplace_back(entry);
             _write_svc->duplicate(1, duplicate, resp);
             ASSERT_EQ(resp.error, 0);
         }
@@ -296,8 +299,10 @@ TEST_F(pegasus_write_service_test, 
illegal_duplicate_request)
 
     // cluster=13 is from nowhere
     dsn::apps::duplicate_request duplicate;
-    duplicate.cluster_id = 13;
-    duplicate.timestamp = 10;
+    dsn::apps::duplicate_entry entry;
+    entry.timestamp = 1000;
+    entry.cluster_id = 2;
+    duplicate.entries.emplace_back(entry);
     dsn::apps::duplicate_response resp;
 
     dsn::apps::update_request request;
@@ -305,8 +310,8 @@ TEST_F(pegasus_write_service_test, 
illegal_duplicate_request)
     request.value.assign(value.data(), 0, value.size());
 
     dsn::message_ptr msg_ptr = pegasus::create_put_request(request); // auto 
release memory
-    duplicate.raw_message = dsn::move_message_to_blob(msg_ptr.get());
-    duplicate.task_code = dsn::apps::RPC_RRDB_RRDB_PUT;
+    entry.raw_message = dsn::move_message_to_blob(msg_ptr.get());
+    entry.task_code = dsn::apps::RPC_RRDB_RRDB_PUT;
     _write_svc->duplicate(1, duplicate, resp);
     ASSERT_EQ(resp.error, rocksdb::Status::kInvalidArgument);
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to