This is an automated email from the ASF dual-hosted git repository.
jiashuo pushed a commit to branch duplication_dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/duplication_dev by this push:
new b4af363 feat(dup_enhancement#18): support batch send log on master
side and handle it on follower side (#919)
b4af363 is described below
commit b4af36376cfc81a172a0e709158db1eee9cf5979
Author: Jiashuo <[email protected]>
AuthorDate: Wed Mar 9 10:53:19 2022 +0800
feat(dup_enhancement#18): support batch send log on master side and handle
it on follower side (#919)
---
src/base/rrdb_types.cpp | 227 ++++++++++++++++-----
src/idl/rrdb.thrift | 5 +
src/include/rrdb/rrdb_types.h | 78 +++++--
src/server/pegasus_mutation_duplicator.cpp | 45 ++--
src/server/pegasus_mutation_duplicator.h | 2 +
src/server/pegasus_write_service.cpp | 124 ++++++-----
.../test/pegasus_mutation_duplicator_test.cpp | 70 +++++--
src/server/test/pegasus_write_service_test.cpp | 35 ++--
8 files changed, 413 insertions(+), 173 deletions(-)
diff --git a/src/base/rrdb_types.cpp b/src/base/rrdb_types.cpp
index 4ef46f3..2d334a4 100644
--- a/src/base/rrdb_types.cpp
+++ b/src/base/rrdb_types.cpp
@@ -4933,37 +4933,154 @@ void scan_response::printTo(std::ostream &out) const
duplicate_request::~duplicate_request() throw() {}
-void duplicate_request::__set_timestamp(const int64_t val)
+void duplicate_request::__set_entries(const std::vector<duplicate_entry> &val)
+{
+ this->entries = val;
+}
+
+uint32_t duplicate_request::read(::apache::thrift::protocol::TProtocol *iprot)
+{
+
+ apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
+ uint32_t xfer = 0;
+ std::string fname;
+ ::apache::thrift::protocol::TType ftype;
+ int16_t fid;
+
+ xfer += iprot->readStructBegin(fname);
+
+ using ::apache::thrift::protocol::TProtocolException;
+
+ while (true) {
+ xfer += iprot->readFieldBegin(fname, ftype, fid);
+ if (ftype == ::apache::thrift::protocol::T_STOP) {
+ break;
+ }
+ switch (fid) {
+ case 1:
+ if (ftype == ::apache::thrift::protocol::T_LIST) {
+ {
+ this->entries.clear();
+ uint32_t _size154;
+ ::apache::thrift::protocol::TType _etype157;
+ xfer += iprot->readListBegin(_etype157, _size154);
+ this->entries.resize(_size154);
+ uint32_t _i158;
+ for (_i158 = 0; _i158 < _size154; ++_i158) {
+ xfer += this->entries[_i158].read(iprot);
+ }
+ xfer += iprot->readListEnd();
+ }
+ this->__isset.entries = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ default:
+ xfer += iprot->skip(ftype);
+ break;
+ }
+ xfer += iprot->readFieldEnd();
+ }
+
+ xfer += iprot->readStructEnd();
+
+ return xfer;
+}
+
+uint32_t duplicate_request::write(::apache::thrift::protocol::TProtocol
*oprot) const
+{
+ uint32_t xfer = 0;
+ apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
+ xfer += oprot->writeStructBegin("duplicate_request");
+
+ xfer += oprot->writeFieldBegin("entries",
::apache::thrift::protocol::T_LIST, 1);
+ {
+ xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRUCT,
+
static_cast<uint32_t>(this->entries.size()));
+ std::vector<duplicate_entry>::const_iterator _iter159;
+ for (_iter159 = this->entries.begin(); _iter159 !=
this->entries.end(); ++_iter159) {
+ xfer += (*_iter159).write(oprot);
+ }
+ xfer += oprot->writeListEnd();
+ }
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldStop();
+ xfer += oprot->writeStructEnd();
+ return xfer;
+}
+
+void swap(duplicate_request &a, duplicate_request &b)
+{
+ using ::std::swap;
+ swap(a.entries, b.entries);
+ swap(a.__isset, b.__isset);
+}
+
+duplicate_request::duplicate_request(const duplicate_request &other160)
+{
+ entries = other160.entries;
+ __isset = other160.__isset;
+}
+duplicate_request::duplicate_request(duplicate_request &&other161)
+{
+ entries = std::move(other161.entries);
+ __isset = std::move(other161.__isset);
+}
+duplicate_request &duplicate_request::operator=(const duplicate_request
&other162)
+{
+ entries = other162.entries;
+ __isset = other162.__isset;
+ return *this;
+}
+duplicate_request &duplicate_request::operator=(duplicate_request &&other163)
+{
+ entries = std::move(other163.entries);
+ __isset = std::move(other163.__isset);
+ return *this;
+}
+void duplicate_request::printTo(std::ostream &out) const
+{
+ using ::apache::thrift::to_string;
+ out << "duplicate_request(";
+ out << "entries=" << to_string(entries);
+ out << ")";
+}
+
+duplicate_entry::~duplicate_entry() throw() {}
+
+void duplicate_entry::__set_timestamp(const int64_t val)
{
this->timestamp = val;
__isset.timestamp = true;
}
-void duplicate_request::__set_task_code(const ::dsn::task_code &val)
+void duplicate_entry::__set_task_code(const ::dsn::task_code &val)
{
this->task_code = val;
__isset.task_code = true;
}
-void duplicate_request::__set_raw_message(const ::dsn::blob &val)
+void duplicate_entry::__set_raw_message(const ::dsn::blob &val)
{
this->raw_message = val;
__isset.raw_message = true;
}
-void duplicate_request::__set_cluster_id(const int8_t val)
+void duplicate_entry::__set_cluster_id(const int8_t val)
{
this->cluster_id = val;
__isset.cluster_id = true;
}
-void duplicate_request::__set_verify_timetag(const bool val)
+void duplicate_entry::__set_verify_timetag(const bool val)
{
this->verify_timetag = val;
__isset.verify_timetag = true;
}
-uint32_t duplicate_request::read(::apache::thrift::protocol::TProtocol *iprot)
+uint32_t duplicate_entry::read(::apache::thrift::protocol::TProtocol *iprot)
{
apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
@@ -5034,11 +5151,11 @@ uint32_t
duplicate_request::read(::apache::thrift::protocol::TProtocol *iprot)
return xfer;
}
-uint32_t duplicate_request::write(::apache::thrift::protocol::TProtocol
*oprot) const
+uint32_t duplicate_entry::write(::apache::thrift::protocol::TProtocol *oprot)
const
{
uint32_t xfer = 0;
apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
- xfer += oprot->writeStructBegin("duplicate_request");
+ xfer += oprot->writeStructBegin("duplicate_entry");
if (this->__isset.timestamp) {
xfer += oprot->writeFieldBegin("timestamp",
::apache::thrift::protocol::T_I64, 1);
@@ -5070,7 +5187,7 @@ uint32_t
duplicate_request::write(::apache::thrift::protocol::TProtocol *oprot)
return xfer;
}
-void swap(duplicate_request &a, duplicate_request &b)
+void swap(duplicate_entry &a, duplicate_entry &b)
{
using ::std::swap;
swap(a.timestamp, b.timestamp);
@@ -5081,48 +5198,48 @@ void swap(duplicate_request &a, duplicate_request &b)
swap(a.__isset, b.__isset);
}
-duplicate_request::duplicate_request(const duplicate_request &other154)
+duplicate_entry::duplicate_entry(const duplicate_entry &other164)
{
- timestamp = other154.timestamp;
- task_code = other154.task_code;
- raw_message = other154.raw_message;
- cluster_id = other154.cluster_id;
- verify_timetag = other154.verify_timetag;
- __isset = other154.__isset;
+ timestamp = other164.timestamp;
+ task_code = other164.task_code;
+ raw_message = other164.raw_message;
+ cluster_id = other164.cluster_id;
+ verify_timetag = other164.verify_timetag;
+ __isset = other164.__isset;
}
-duplicate_request::duplicate_request(duplicate_request &&other155)
+duplicate_entry::duplicate_entry(duplicate_entry &&other165)
{
- timestamp = std::move(other155.timestamp);
- task_code = std::move(other155.task_code);
- raw_message = std::move(other155.raw_message);
- cluster_id = std::move(other155.cluster_id);
- verify_timetag = std::move(other155.verify_timetag);
- __isset = std::move(other155.__isset);
+ timestamp = std::move(other165.timestamp);
+ task_code = std::move(other165.task_code);
+ raw_message = std::move(other165.raw_message);
+ cluster_id = std::move(other165.cluster_id);
+ verify_timetag = std::move(other165.verify_timetag);
+ __isset = std::move(other165.__isset);
}
-duplicate_request &duplicate_request::operator=(const duplicate_request
&other156)
+duplicate_entry &duplicate_entry::operator=(const duplicate_entry &other166)
{
- timestamp = other156.timestamp;
- task_code = other156.task_code;
- raw_message = other156.raw_message;
- cluster_id = other156.cluster_id;
- verify_timetag = other156.verify_timetag;
- __isset = other156.__isset;
+ timestamp = other166.timestamp;
+ task_code = other166.task_code;
+ raw_message = other166.raw_message;
+ cluster_id = other166.cluster_id;
+ verify_timetag = other166.verify_timetag;
+ __isset = other166.__isset;
return *this;
}
-duplicate_request &duplicate_request::operator=(duplicate_request &&other157)
+duplicate_entry &duplicate_entry::operator=(duplicate_entry &&other167)
{
- timestamp = std::move(other157.timestamp);
- task_code = std::move(other157.task_code);
- raw_message = std::move(other157.raw_message);
- cluster_id = std::move(other157.cluster_id);
- verify_timetag = std::move(other157.verify_timetag);
- __isset = std::move(other157.__isset);
+ timestamp = std::move(other167.timestamp);
+ task_code = std::move(other167.task_code);
+ raw_message = std::move(other167.raw_message);
+ cluster_id = std::move(other167.cluster_id);
+ verify_timetag = std::move(other167.verify_timetag);
+ __isset = std::move(other167.__isset);
return *this;
}
-void duplicate_request::printTo(std::ostream &out) const
+void duplicate_entry::printTo(std::ostream &out) const
{
using ::apache::thrift::to_string;
- out << "duplicate_request(";
+ out << "duplicate_entry(";
out << "timestamp=";
(__isset.timestamp ? (out << to_string(timestamp)) : (out << "<null>"));
out << ", "
@@ -5230,30 +5347,30 @@ void swap(duplicate_response &a, duplicate_response &b)
swap(a.__isset, b.__isset);
}
-duplicate_response::duplicate_response(const duplicate_response &other158)
+duplicate_response::duplicate_response(const duplicate_response &other168)
{
- error = other158.error;
- error_hint = other158.error_hint;
- __isset = other158.__isset;
+ error = other168.error;
+ error_hint = other168.error_hint;
+ __isset = other168.__isset;
}
-duplicate_response::duplicate_response(duplicate_response &&other159)
+duplicate_response::duplicate_response(duplicate_response &&other169)
{
- error = std::move(other159.error);
- error_hint = std::move(other159.error_hint);
- __isset = std::move(other159.__isset);
+ error = std::move(other169.error);
+ error_hint = std::move(other169.error_hint);
+ __isset = std::move(other169.__isset);
}
-duplicate_response &duplicate_response::operator=(const duplicate_response
&other160)
+duplicate_response &duplicate_response::operator=(const duplicate_response
&other170)
{
- error = other160.error;
- error_hint = other160.error_hint;
- __isset = other160.__isset;
+ error = other170.error;
+ error_hint = other170.error_hint;
+ __isset = other170.__isset;
return *this;
}
-duplicate_response &duplicate_response::operator=(duplicate_response
&&other161)
+duplicate_response &duplicate_response::operator=(duplicate_response
&&other171)
{
- error = std::move(other161.error);
- error_hint = std::move(other161.error_hint);
- __isset = std::move(other161.__isset);
+ error = std::move(other171.error);
+ error_hint = std::move(other171.error_hint);
+ __isset = std::move(other171.__isset);
return *this;
}
void duplicate_response::printTo(std::ostream &out) const
diff --git a/src/idl/rrdb.thrift b/src/idl/rrdb.thrift
index 99316d2..3fc8280 100644
--- a/src/idl/rrdb.thrift
+++ b/src/idl/rrdb.thrift
@@ -298,6 +298,11 @@ struct scan_response
struct duplicate_request
{
+ 1: list<duplicate_entry> entries
+}
+
+struct duplicate_entry
+{
// The timestamp of this write.
1: optional i64 timestamp
diff --git a/src/include/rrdb/rrdb_types.h b/src/include/rrdb/rrdb_types.h
index 0e36678..7a75c28 100644
--- a/src/include/rrdb/rrdb_types.h
+++ b/src/include/rrdb/rrdb_types.h
@@ -123,6 +123,8 @@ class scan_response;
class duplicate_request;
+class duplicate_entry;
+
class duplicate_response;
typedef struct _update_request__isset
@@ -2074,7 +2076,53 @@ inline std::ostream &operator<<(std::ostream &out, const
scan_response &obj)
typedef struct _duplicate_request__isset
{
- _duplicate_request__isset()
+ _duplicate_request__isset() : entries(false) {}
+ bool entries : 1;
+} _duplicate_request__isset;
+
+class duplicate_request
+{
+public:
+ duplicate_request(const duplicate_request &);
+ duplicate_request(duplicate_request &&);
+ duplicate_request &operator=(const duplicate_request &);
+ duplicate_request &operator=(duplicate_request &&);
+ duplicate_request() {}
+
+ virtual ~duplicate_request() throw();
+ std::vector<duplicate_entry> entries;
+
+ _duplicate_request__isset __isset;
+
+ void __set_entries(const std::vector<duplicate_entry> &val);
+
+ bool operator==(const duplicate_request &rhs) const
+ {
+ if (!(entries == rhs.entries))
+ return false;
+ return true;
+ }
+ bool operator!=(const duplicate_request &rhs) const { return !(*this ==
rhs); }
+
+ bool operator<(const duplicate_request &) const;
+
+ uint32_t read(::apache::thrift::protocol::TProtocol *iprot);
+ uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const;
+
+ virtual void printTo(std::ostream &out) const;
+};
+
+void swap(duplicate_request &a, duplicate_request &b);
+
+inline std::ostream &operator<<(std::ostream &out, const duplicate_request
&obj)
+{
+ obj.printTo(out);
+ return out;
+}
+
+typedef struct _duplicate_entry__isset
+{
+ _duplicate_entry__isset()
: timestamp(false),
task_code(false),
raw_message(false),
@@ -2087,25 +2135,25 @@ typedef struct _duplicate_request__isset
bool raw_message : 1;
bool cluster_id : 1;
bool verify_timetag : 1;
-} _duplicate_request__isset;
+} _duplicate_entry__isset;
-class duplicate_request
+class duplicate_entry
{
public:
- duplicate_request(const duplicate_request &);
- duplicate_request(duplicate_request &&);
- duplicate_request &operator=(const duplicate_request &);
- duplicate_request &operator=(duplicate_request &&);
- duplicate_request() : timestamp(0), cluster_id(0), verify_timetag(0) {}
+ duplicate_entry(const duplicate_entry &);
+ duplicate_entry(duplicate_entry &&);
+ duplicate_entry &operator=(const duplicate_entry &);
+ duplicate_entry &operator=(duplicate_entry &&);
+ duplicate_entry() : timestamp(0), cluster_id(0), verify_timetag(0) {}
- virtual ~duplicate_request() throw();
+ virtual ~duplicate_entry() throw();
int64_t timestamp;
::dsn::task_code task_code;
::dsn::blob raw_message;
int8_t cluster_id;
bool verify_timetag;
- _duplicate_request__isset __isset;
+ _duplicate_entry__isset __isset;
void __set_timestamp(const int64_t val);
@@ -2117,7 +2165,7 @@ public:
void __set_verify_timetag(const bool val);
- bool operator==(const duplicate_request &rhs) const
+ bool operator==(const duplicate_entry &rhs) const
{
if (__isset.timestamp != rhs.__isset.timestamp)
return false;
@@ -2141,9 +2189,9 @@ public:
return false;
return true;
}
- bool operator!=(const duplicate_request &rhs) const { return !(*this ==
rhs); }
+ bool operator!=(const duplicate_entry &rhs) const { return !(*this ==
rhs); }
- bool operator<(const duplicate_request &) const;
+ bool operator<(const duplicate_entry &) const;
uint32_t read(::apache::thrift::protocol::TProtocol *iprot);
uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const;
@@ -2151,9 +2199,9 @@ public:
virtual void printTo(std::ostream &out) const;
};
-void swap(duplicate_request &a, duplicate_request &b);
+void swap(duplicate_entry &a, duplicate_entry &b);
-inline std::ostream &operator<<(std::ostream &out, const duplicate_request
&obj)
+inline std::ostream &operator<<(std::ostream &out, const duplicate_entry &obj)
{
obj.printTo(out);
return out;
diff --git a/src/server/pegasus_mutation_duplicator.cpp
b/src/server/pegasus_mutation_duplicator.cpp
index 7244f4c..6fc4ecf 100644
--- a/src/server/pegasus_mutation_duplicator.cpp
+++ b/src/server/pegasus_mutation_duplicator.cpp
@@ -42,6 +42,11 @@ namespace replication {
namespace pegasus {
namespace server {
+DSN_DEFINE_uint32("pegasus",
+ duplicate_log_batch_megabytes,
+ 4,
+ "send mutation log batch size per rpc");
+
using namespace dsn::literals::chrono_literals;
/*extern*/ uint64_t get_hash_from_request(dsn::task_code tc, const dsn::blob
&data)
@@ -146,9 +151,9 @@ void
pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash,
// errors are acceptable.
// TODO(wutao1): print the entire request for future debugging.
if (dsn::rand::next_double01() <= 0.01) {
- derror_replica("duplicate_rpc failed: {} [timestamp:{}]",
+ derror_replica("duplicate_rpc failed: {} [size:{}]",
err == dsn::ERR_OK ?
_client->get_error_string(perr) : err.to_string(),
- rpc.request().timestamp);
+ rpc.request().entries.size());
}
// duplicating an illegal write to server is unacceptable, fail fast.
dassert_replica(perr != PERR_INVALID_ARGUMENT,
rpc.response().error_hint);
@@ -184,9 +189,14 @@ void
pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb
{
_total_shipped_size = 0;
+ auto batch_request = dsn::make_unique<dsn::apps::duplicate_request>();
+ uint batch_bytes = 0;
+ int cur_count = 0;
+
for (auto mut : muts) {
// mut: 0=timestamp, 1=rpc_code, 2=raw_message
+ cur_count++;
dsn::task_code rpc_code = std::get<1>(mut);
dsn::blob raw_message = std::get<2>(mut);
auto dreq = dsn::make_unique<dsn::apps::duplicate_request>();
@@ -197,18 +207,29 @@ void
pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb
// destinations. A DUPLICATE is meant to be targeting only one
cluster.
continue;
} else {
- dreq->__set_raw_message(raw_message);
- dreq->__set_task_code(rpc_code);
- dreq->__set_timestamp(std::get<0>(mut));
- dreq->__set_cluster_id(get_current_cluster_id());
+ dsn::apps::duplicate_entry entry;
+ entry.__set_raw_message(raw_message);
+ entry.__set_task_code(rpc_code);
+ entry.__set_timestamp(std::get<0>(mut));
+ entry.__set_cluster_id(get_current_cluster_id());
+ batch_request->entries.emplace_back(std::move(entry));
+ batch_bytes += raw_message.length();
}
- uint64_t hash = get_hash_from_request(rpc_code, raw_message);
- duplicate_rpc rpc(std::move(dreq),
- dsn::apps::RPC_RRDB_RRDB_DUPLICATE,
- 10_s, // TODO(wutao1): configurable timeout.
- hash);
- _inflights[hash].push_back(std::move(rpc));
+ if (batch_bytes >= (FLAGS_duplicate_log_batch_megabytes << 20) ||
+ cur_count == muts.size()) {
+ // since all the plog's mutations of replica belong to same gpid
though the hash of
+ // mutation is different, use the last mutation of one batch to
get and represents the
+ // current hash value, it will still send to remote correct replica
+ uint64_t hash = get_hash_from_request(rpc_code, raw_message);
+ duplicate_rpc rpc(std::move(batch_request),
+ dsn::apps::RPC_RRDB_RRDB_DUPLICATE,
+ 100_s, // TODO(wutao1): configurable timeout.
+ hash);
+ _inflights[hash].push_back(std::move(rpc));
+ batch_request = dsn::make_unique<dsn::apps::duplicate_request>();
+ batch_bytes = 0;
+ }
}
if (_inflights.empty()) {
diff --git a/src/server/pegasus_mutation_duplicator.h
b/src/server/pegasus_mutation_duplicator.h
index 4d0b175..daeaf34 100644
--- a/src/server/pegasus_mutation_duplicator.h
+++ b/src/server/pegasus_mutation_duplicator.h
@@ -22,11 +22,13 @@
#include <dsn/dist/replication/mutation_duplicator.h>
#include <dsn/dist/replication/replica_base.h>
#include <rrdb/rrdb.code.definition.h>
+#include <dsn/utility/flags.h>
#include "client_lib/pegasus_client_factory_impl.h"
namespace pegasus {
namespace server {
+DSN_DECLARE_uint32(duplicate_log_batch_megabytes);
using namespace dsn::literals::chrono_literals;
diff --git a/src/server/pegasus_write_service.cpp
b/src/server/pegasus_write_service.cpp
index 48eaeb7..1a13753 100644
--- a/src/server/pegasus_write_service.cpp
+++ b/src/server/pegasus_write_service.cpp
@@ -308,69 +308,83 @@ void pegasus_write_service::clear_up_batch_states()
}
int pegasus_write_service::duplicate(int64_t decree,
- const dsn::apps::duplicate_request
&request,
+ const dsn::apps::duplicate_request
&requests,
dsn::apps::duplicate_response &resp)
{
// Verifies the cluster_id.
- if (!dsn::replication::is_cluster_id_configured(request.cluster_id)) {
- resp.__set_error(rocksdb::Status::kInvalidArgument);
- resp.__set_error_hint("request cluster id is unconfigured");
- return empty_put(decree);
- }
- if (request.cluster_id == get_current_cluster_id()) {
- resp.__set_error(rocksdb::Status::kInvalidArgument);
- resp.__set_error_hint("self-duplicating");
- return empty_put(decree);
- }
-
- _pfc_duplicate_qps->increment();
- auto cleanup = dsn::defer([this, &request]() {
- uint64_t latency_ms = (dsn_now_us() - request.timestamp) / 1000;
- if (latency_ms > _dup_lagging_write_threshold_ms) {
- _pfc_dup_lagging_writes->increment();
+ for (const auto &request : requests.entries) {
+ if (!dsn::replication::is_cluster_id_configured(request.cluster_id)) {
+ resp.__set_error(rocksdb::Status::kInvalidArgument);
+ resp.__set_error_hint("request cluster id is unconfigured");
+ return empty_put(decree);
}
- _pfc_dup_time_lag->set(latency_ms);
- });
- dsn::message_ex *write = dsn::from_blob_to_received_msg(request.task_code,
request.raw_message);
- bool is_delete = request.task_code ==
dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE ||
- request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE;
- auto remote_timetag = generate_timetag(request.timestamp,
request.cluster_id, is_delete);
- auto ctx = db_write_context::create_duplicate(decree, remote_timetag,
request.verify_timetag);
-
- if (request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) {
- multi_put_rpc rpc(write);
- resp.__set_error(_impl->multi_put(ctx, rpc.request(), rpc.response()));
- return resp.error;
- }
- if (request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) {
- multi_remove_rpc rpc(write);
- resp.__set_error(_impl->multi_remove(ctx.decree, rpc.request(),
rpc.response()));
- return resp.error;
- }
- put_rpc put;
- remove_rpc remove;
- if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT ||
- request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
- int err = 0;
- if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
- put = put_rpc(write);
- err = _impl->batch_put(ctx, put.request(), put.response());
+ if (request.cluster_id == get_current_cluster_id()) {
+ resp.__set_error(rocksdb::Status::kInvalidArgument);
+ resp.__set_error_hint("self-duplicating");
+ return empty_put(decree);
}
- if (request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
- remove = remove_rpc(write);
- err = _impl->batch_remove(ctx.decree, remove.request(),
remove.response());
+
+ _pfc_duplicate_qps->increment();
+ auto cleanup = dsn::defer([this, &request]() {
+ uint64_t latency_ms = (dsn_now_us() - request.timestamp) / 1000;
+ if (latency_ms > _dup_lagging_write_threshold_ms) {
+ _pfc_dup_lagging_writes->increment();
+ }
+ _pfc_dup_time_lag->set(latency_ms);
+ });
+ dsn::message_ex *write =
+ dsn::from_blob_to_received_msg(request.task_code,
request.raw_message);
+ bool is_delete = request.task_code ==
dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE ||
+ request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE;
+ auto remote_timetag = generate_timetag(request.timestamp,
request.cluster_id, is_delete);
+ auto ctx =
+ db_write_context::create_duplicate(decree, remote_timetag,
request.verify_timetag);
+
+ if (request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) {
+ multi_put_rpc rpc(write);
+ resp.__set_error(_impl->multi_put(ctx, rpc.request(),
rpc.response()));
+ if (resp.error != rocksdb::Status::kOk) {
+ return resp.error;
+ }
+ continue;
}
- if (!err) {
- err = _impl->batch_commit(ctx.decree);
- } else {
- _impl->batch_abort(ctx.decree, err);
+ if (request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) {
+ multi_remove_rpc rpc(write);
+ resp.__set_error(_impl->multi_remove(ctx.decree, rpc.request(),
rpc.response()));
+ if (resp.error != rocksdb::Status::kOk) {
+ return resp.error;
+ }
+ continue;
}
- resp.__set_error(err);
- return resp.error;
+ put_rpc put;
+ remove_rpc remove;
+ if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT ||
+ request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
+ int err = 0;
+ if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
+ put = put_rpc(write);
+ err = _impl->batch_put(ctx, put.request(), put.response());
+ }
+ if (request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
+ remove = remove_rpc(write);
+ err = _impl->batch_remove(ctx.decree, remove.request(),
remove.response());
+ }
+ if (!err) {
+ err = _impl->batch_commit(ctx.decree);
+ } else {
+ _impl->batch_abort(ctx.decree, err);
+ }
+ resp.__set_error(err);
+ if (resp.error != rocksdb::Status::kOk) {
+ return resp.error;
+ }
+ continue;
+ }
+ resp.__set_error(rocksdb::Status::kInvalidArgument);
+ resp.__set_error_hint(fmt::format("unrecognized task code {}",
request.task_code));
+ return empty_put(ctx.decree);
}
- resp.__set_error(rocksdb::Status::kInvalidArgument);
- resp.__set_error_hint(fmt::format("unrecognized task code {}",
request.task_code));
- return empty_put(ctx.decree);
+ return resp.error;
}
int pegasus_write_service::ingest_files(int64_t decree,
diff --git a/src/server/test/pegasus_mutation_duplicator_test.cpp
b/src/server/test/pegasus_mutation_duplicator_test.cpp
index 2dab5a4..f61fe69 100644
--- a/src/server/test/pegasus_mutation_duplicator_test.cpp
+++ b/src/server/test/pegasus_mutation_duplicator_test.cpp
@@ -19,6 +19,7 @@
#include "server/pegasus_mutation_duplicator.h"
#include "base/pegasus_rpc_types.h"
+#include "base/value_schema_manager.h"
#include "pegasus_server_test_base.h"
#include <gtest/gtest.h>
@@ -49,19 +50,27 @@ public:
auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp");
duplicator->set_task_environment(&_env);
+ std::string sort_key;
+ for (int i = 0; i < 1000; i++) {
+ sort_key = fmt::format("{}_{}", sort_key, i);
+ }
+
mutation_tuple_set muts;
- for (uint64_t i = 0; i < 100; i++) {
+ uint total_bytes = 0;
+ for (uint64_t i = 0; i < 4000; i++) {
uint64_t ts = 200 + i;
dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT;
dsn::apps::update_request request;
- pegasus::pegasus_generate_key(request.key, std::string("hash"),
std::string("sort"));
+ pegasus::pegasus_generate_key(request.key, std::string("hash"),
sort_key);
dsn::message_ptr msg =
dsn::from_thrift_request_to_received_message(request,
dsn::apps::RPC_RRDB_RRDB_PUT);
auto data = dsn::move_message_to_blob(msg.get());
muts.insert(std::make_tuple(ts, code, data));
+ total_bytes += data.length();
}
+ auto batch_count = total_bytes / (FLAGS_duplicate_log_batch_megabytes
<< 20) + 1;
size_t total_shipped_size = 0;
auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator
*>(duplicator.get());
@@ -69,14 +78,13 @@ public:
{
duplicator->duplicate(muts, [](size_t) {});
- size_t total_size = 100;
- while (total_size > 0) {
+ while (batch_count > 0) {
// ensure mutations having the same hash are sending
sequentially.
ASSERT_EQ(duplicator_impl->_inflights.size(), 1);
ASSERT_EQ(duplicate_rpc::mail_box().size(), 1);
- total_size--;
- ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(),
total_size);
+ batch_count--;
+ ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(),
batch_count);
auto rpc = duplicate_rpc::mail_box().back();
duplicate_rpc::mail_box().pop_back();
@@ -106,19 +114,27 @@ public:
auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp");
duplicator->set_task_environment(&_env);
+ std::string sort_key;
+ for (int i = 0; i < 1000; i++) {
+ sort_key = fmt::format("{}_{}", sort_key, i);
+ }
+
mutation_tuple_set muts;
- for (uint64_t i = 0; i < 10; i++) {
+ uint total_bytes = 0;
+ for (uint64_t i = 0; i < 4000; i++) {
uint64_t ts = 200 + i;
dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT;
dsn::apps::update_request request;
- pegasus::pegasus_generate_key(request.key, std::string("hash"),
std::string("sort"));
+ pegasus::pegasus_generate_key(request.key, std::string("hash"),
sort_key);
dsn::message_ptr msg =
dsn::from_thrift_request_to_received_message(request,
dsn::apps::RPC_RRDB_RRDB_PUT);
auto data = dsn::move_message_to_blob(msg.get());
muts.insert(std::make_tuple(ts, code, data));
+ total_bytes += data.length();
}
+ auto batch_count = total_bytes / (FLAGS_duplicate_log_batch_megabytes
<< 20) + 1;
auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator
*>(duplicator.get());
RPC_MOCKING(duplicate_rpc)
@@ -127,7 +143,7 @@ public:
auto rpc = duplicate_rpc::mail_box().back();
duplicate_rpc::mail_box().pop_back();
- ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9);
+ ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(),
batch_count - 1);
// failed
duplicator_impl->on_duplicate_reply(
@@ -139,7 +155,7 @@ public:
// retry infinitely
ASSERT_EQ(duplicator_impl->_inflights.size(), 1);
ASSERT_EQ(duplicate_rpc::mail_box().size(), 1);
- ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9);
+ ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(),
batch_count - 1);
duplicate_rpc::mail_box().clear();
// with other error
@@ -148,7 +164,7 @@ public:
_tracker.wait_outstanding_tasks();
ASSERT_EQ(duplicator_impl->_inflights.size(), 1);
ASSERT_EQ(duplicate_rpc::mail_box().size(), 1);
- ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9);
+ ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(),
batch_count - 1);
duplicate_rpc::mail_box().clear();
// with other error
@@ -158,7 +174,7 @@ public:
_tracker.wait_outstanding_tasks();
ASSERT_EQ(duplicator_impl->_inflights.size(), 1);
ASSERT_EQ(duplicate_rpc::mail_box().size(), 1);
- ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9);
+ ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(),
batch_count - 1);
duplicate_rpc::mail_box().clear();
}
}
@@ -169,30 +185,38 @@ public:
auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp");
duplicator->set_task_environment(&_env);
- size_t total_size = 3000;
+ size_t total_size = 4000;
+ std::string sort_key;
+ for (int i = 0; i < 1000; i++) {
+ sort_key = fmt::format("{}_{}", sort_key, i);
+ }
+
mutation_tuple_set muts;
+ uint total_bytes = 0;
for (uint64_t i = 0; i < total_size; i++) {
uint64_t ts = 200 + i;
dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT;
dsn::apps::update_request request;
pegasus::pegasus_generate_key(
- request.key, std::string("hash") + std::to_string(i),
std::string("sort"));
+ request.key, std::string("hash") + std::to_string(i),
sort_key);
dsn::message_ptr msg =
dsn::from_thrift_request_to_received_message(request, code);
auto data = dsn::move_message_to_blob(msg.get());
muts.insert(std::make_tuple(ts, code, data));
+ total_bytes += data.length();
}
+ auto batch_count = total_bytes / (FLAGS_duplicate_log_batch_megabytes
<< 20) + 1;
auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator
*>(duplicator.get());
RPC_MOCKING(duplicate_rpc)
{
duplicator->duplicate(muts, [](size_t) {});
- // ensure each bucket has only 1 request and each request is
+ // ensure each bucket has only 1 batch request and each request is
// isolated with others.
- ASSERT_EQ(duplicator_impl->_inflights.size(), total_size);
- ASSERT_EQ(duplicate_rpc::mail_box().size(), total_size);
+ ASSERT_EQ(duplicator_impl->_inflights.size(), batch_count);
+ ASSERT_EQ(duplicate_rpc::mail_box().size(), batch_count);
for (const auto &ents : duplicator_impl->_inflights) {
ASSERT_EQ(ents.second.size(), 0);
}
@@ -223,7 +247,9 @@ public:
private:
static uint64_t get_hash(const duplicate_rpc &rpc)
{
- return get_hash_from_request(rpc.request().task_code,
rpc.request().raw_message);
+ auto size = rpc.request().entries.size();
+ return get_hash_from_request(rpc.request().entries[size - 1].task_code,
+ rpc.request().entries[size -
1].raw_message);
}
};
@@ -322,9 +348,11 @@ TEST_F(pegasus_mutation_duplicator_test,
duplicate_duplicate)
// a duplicate from onebox2
dsn::apps::duplicate_request dup;
- dup.cluster_id = 2;
- dup.raw_message = data;
- dup.timestamp = 200;
+ dsn::apps::duplicate_entry entry;
+ entry.cluster_id = 2;
+ entry.raw_message = data;
+ entry.timestamp = 200;
+ dup.entries.emplace_back(entry);
msg = dsn::from_thrift_request_to_received_message(dup,
dsn::apps::RPC_RRDB_RRDB_DUPLICATE);
data = dsn::move_message_to_blob(msg.get());
diff --git a/src/server/test/pegasus_write_service_test.cpp
b/src/server/test/pegasus_write_service_test.cpp
index 839da27..9bd3d7f 100644
--- a/src/server/test/pegasus_write_service_test.cpp
+++ b/src/server/test/pegasus_write_service_test.cpp
@@ -220,8 +220,9 @@ TEST_F(pegasus_write_service_test, duplicate_not_batched)
}
dsn::apps::duplicate_request duplicate;
- duplicate.timestamp = 1000;
- duplicate.cluster_id = 2;
+ dsn::apps::duplicate_entry entry;
+ entry.timestamp = 1000;
+ entry.cluster_id = 2;
dsn::apps::duplicate_response resp;
{
@@ -233,9 +234,9 @@ TEST_F(pegasus_write_service_test, duplicate_not_batched)
}
dsn::message_ptr mput_msg = pegasus::create_multi_put_request(mput);
- duplicate.task_code = dsn::apps::RPC_RRDB_RRDB_MULTI_PUT;
- duplicate.raw_message = dsn::move_message_to_blob(mput_msg.get());
-
+ entry.task_code = dsn::apps::RPC_RRDB_RRDB_MULTI_PUT;
+ entry.raw_message = dsn::move_message_to_blob(mput_msg.get());
+ duplicate.entries.emplace_back(entry);
_write_svc->duplicate(1, duplicate, resp);
ASSERT_EQ(resp.error, 0);
}
@@ -248,8 +249,8 @@ TEST_F(pegasus_write_service_test, duplicate_not_batched)
}
dsn::message_ptr mremove_msg =
pegasus::create_multi_remove_request(mremove);
- duplicate.task_code = dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE;
- duplicate.raw_message = dsn::move_message_to_blob(mremove_msg.get());
+ entry.task_code = dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE;
+ entry.raw_message = dsn::move_message_to_blob(mremove_msg.get());
_write_svc->duplicate(1, duplicate, resp);
ASSERT_EQ(resp.error, 0);
@@ -270,8 +271,9 @@ TEST_F(pegasus_write_service_test, duplicate_batched)
{
dsn::apps::duplicate_request duplicate;
- duplicate.timestamp = 1000;
- duplicate.cluster_id = 2;
+ dsn::apps::duplicate_entry entry;
+ entry.timestamp = 1000;
+ entry.cluster_id = 2;
dsn::apps::duplicate_response resp;
for (int i = 0; i < kv_num; i++) {
@@ -280,8 +282,9 @@ TEST_F(pegasus_write_service_test, duplicate_batched)
request.value.assign(value[i].data(), 0, value[i].size());
dsn::message_ptr msg_ptr = pegasus::create_put_request(request);
- duplicate.raw_message = dsn::move_message_to_blob(msg_ptr.get());
- duplicate.task_code = dsn::apps::RPC_RRDB_RRDB_PUT;
+ entry.raw_message = dsn::move_message_to_blob(msg_ptr.get());
+ entry.task_code = dsn::apps::RPC_RRDB_RRDB_PUT;
+ duplicate.entries.emplace_back(entry);
_write_svc->duplicate(1, duplicate, resp);
ASSERT_EQ(resp.error, 0);
}
@@ -296,8 +299,10 @@ TEST_F(pegasus_write_service_test,
illegal_duplicate_request)
// cluster=13 is from nowhere
dsn::apps::duplicate_request duplicate;
- duplicate.cluster_id = 13;
- duplicate.timestamp = 10;
+ dsn::apps::duplicate_entry entry;
+ entry.timestamp = 1000;
+ entry.cluster_id = 2;
+ duplicate.entries.emplace_back(entry);
dsn::apps::duplicate_response resp;
dsn::apps::update_request request;
@@ -305,8 +310,8 @@ TEST_F(pegasus_write_service_test,
illegal_duplicate_request)
request.value.assign(value.data(), 0, value.size());
dsn::message_ptr msg_ptr = pegasus::create_put_request(request); // auto
release memory
- duplicate.raw_message = dsn::move_message_to_blob(msg_ptr.get());
- duplicate.task_code = dsn::apps::RPC_RRDB_RRDB_PUT;
+ entry.raw_message = dsn::move_message_to_blob(msg_ptr.get());
+ entry.task_code = dsn::apps::RPC_RRDB_RRDB_PUT;
_write_svc->duplicate(1, duplicate, resp);
ASSERT_EQ(resp.error, rocksdb::Status::kInvalidArgument);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]