This is an automated email from the ASF dual-hosted git repository. jiashuo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit fe149ca8d82711d942583f2838bb2853c48a5455 Author: Jiashuo <[email protected]> AuthorDate: Wed Mar 9 10:53:19 2022 +0800 feat(dup_enhancement#18): support batch send log on master side and handle it on follower side (#919) --- src/base/rrdb_types.cpp | 227 ++++++++++++++++----- src/idl/rrdb.thrift | 5 + src/include/rrdb/rrdb_types.h | 78 +++++-- src/server/pegasus_mutation_duplicator.cpp | 45 ++-- src/server/pegasus_mutation_duplicator.h | 2 + src/server/pegasus_write_service.cpp | 124 ++++++----- .../test/pegasus_mutation_duplicator_test.cpp | 70 +++++-- src/server/test/pegasus_write_service_test.cpp | 35 ++-- 8 files changed, 413 insertions(+), 173 deletions(-) diff --git a/src/base/rrdb_types.cpp b/src/base/rrdb_types.cpp index 4ef46f3..2d334a4 100644 --- a/src/base/rrdb_types.cpp +++ b/src/base/rrdb_types.cpp @@ -4933,37 +4933,154 @@ void scan_response::printTo(std::ostream &out) const duplicate_request::~duplicate_request() throw() {} -void duplicate_request::__set_timestamp(const int64_t val) +void duplicate_request::__set_entries(const std::vector<duplicate_entry> &val) +{ + this->entries = val; +} + +uint32_t duplicate_request::read(::apache::thrift::protocol::TProtocol *iprot) +{ + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + while (true) { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) { + case 1: + if (ftype == ::apache::thrift::protocol::T_LIST) { + { + this->entries.clear(); + uint32_t _size154; + ::apache::thrift::protocol::TType _etype157; + xfer += iprot->readListBegin(_etype157, _size154); + this->entries.resize(_size154); + uint32_t _i158; + for (_i158 = 0; _i158 < _size154; ++_i158) { + xfer += this->entries[_i158].read(iprot); + } + xfer += iprot->readListEnd(); + } + this->__isset.entries = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t duplicate_request::write(::apache::thrift::protocol::TProtocol *oprot) const +{ + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("duplicate_request"); + + xfer += oprot->writeFieldBegin("entries", ::apache::thrift::protocol::T_LIST, 1); + { + xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRUCT, + static_cast<uint32_t>(this->entries.size())); + std::vector<duplicate_entry>::const_iterator _iter159; + for (_iter159 = this->entries.begin(); _iter159 != this->entries.end(); ++_iter159) { + xfer += (*_iter159).write(oprot); + } + xfer += oprot->writeListEnd(); + } + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(duplicate_request &a, duplicate_request &b) +{ + using ::std::swap; + swap(a.entries, b.entries); + swap(a.__isset, b.__isset); +} + +duplicate_request::duplicate_request(const duplicate_request &other160) +{ + entries = other160.entries; + __isset = other160.__isset; +} +duplicate_request::duplicate_request(duplicate_request &&other161) +{ + entries = std::move(other161.entries); + __isset = std::move(other161.__isset); +} +duplicate_request &duplicate_request::operator=(const duplicate_request &other162) +{ + entries = other162.entries; + __isset = other162.__isset; + return *this; +} +duplicate_request &duplicate_request::operator=(duplicate_request &&other163) +{ + entries = std::move(other163.entries); + __isset = std::move(other163.__isset); + return *this; +} +void duplicate_request::printTo(std::ostream &out) const +{ + using ::apache::thrift::to_string; + out << "duplicate_request("; + out << "entries=" << to_string(entries); + out << ")"; +} + +duplicate_entry::~duplicate_entry() throw() {} + +void duplicate_entry::__set_timestamp(const int64_t val) { this->timestamp = val; __isset.timestamp = true; } -void duplicate_request::__set_task_code(const ::dsn::task_code &val) +void duplicate_entry::__set_task_code(const ::dsn::task_code &val) { this->task_code = val; __isset.task_code = true; } -void duplicate_request::__set_raw_message(const ::dsn::blob &val) +void duplicate_entry::__set_raw_message(const ::dsn::blob &val) { this->raw_message = val; __isset.raw_message = true; } -void duplicate_request::__set_cluster_id(const int8_t val) +void duplicate_entry::__set_cluster_id(const int8_t val) { this->cluster_id = val; __isset.cluster_id = true; } -void duplicate_request::__set_verify_timetag(const bool val) +void duplicate_entry::__set_verify_timetag(const bool val) { this->verify_timetag = val; __isset.verify_timetag = true; } -uint32_t duplicate_request::read(::apache::thrift::protocol::TProtocol *iprot) +uint32_t duplicate_entry::read(::apache::thrift::protocol::TProtocol *iprot) { apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); @@ -5034,11 +5151,11 @@ uint32_t duplicate_request::read(::apache::thrift::protocol::TProtocol *iprot) return xfer; } -uint32_t duplicate_request::write(::apache::thrift::protocol::TProtocol *oprot) const +uint32_t duplicate_entry::write(::apache::thrift::protocol::TProtocol *oprot) const { uint32_t xfer = 0; apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); - xfer += oprot->writeStructBegin("duplicate_request"); + xfer += oprot->writeStructBegin("duplicate_entry"); if (this->__isset.timestamp) { xfer += oprot->writeFieldBegin("timestamp", ::apache::thrift::protocol::T_I64, 1); @@ -5070,7 +5187,7 @@ uint32_t duplicate_request::write(::apache::thrift::protocol::TProtocol *oprot) return xfer; } -void swap(duplicate_request &a, duplicate_request &b) +void swap(duplicate_entry &a, duplicate_entry &b) { using ::std::swap; swap(a.timestamp, b.timestamp); @@ -5081,48 +5198,48 @@ void swap(duplicate_request &a, duplicate_request &b) swap(a.__isset, b.__isset); } -duplicate_request::duplicate_request(const duplicate_request &other154) +duplicate_entry::duplicate_entry(const duplicate_entry &other164) { - timestamp = other154.timestamp; - task_code = other154.task_code; - raw_message = other154.raw_message; - cluster_id = other154.cluster_id; - verify_timetag = other154.verify_timetag; - __isset = other154.__isset; + timestamp = other164.timestamp; + task_code = other164.task_code; + raw_message = other164.raw_message; + cluster_id = other164.cluster_id; + verify_timetag = other164.verify_timetag; + __isset = other164.__isset; } -duplicate_request::duplicate_request(duplicate_request &&other155) +duplicate_entry::duplicate_entry(duplicate_entry &&other165) { - timestamp = std::move(other155.timestamp); - task_code = std::move(other155.task_code); - raw_message = std::move(other155.raw_message); - cluster_id = std::move(other155.cluster_id); - verify_timetag = std::move(other155.verify_timetag); - __isset = std::move(other155.__isset); + timestamp = std::move(other165.timestamp); + task_code = std::move(other165.task_code); + raw_message = std::move(other165.raw_message); + cluster_id = std::move(other165.cluster_id); + verify_timetag = std::move(other165.verify_timetag); + __isset = std::move(other165.__isset); } -duplicate_request &duplicate_request::operator=(const duplicate_request &other156) +duplicate_entry &duplicate_entry::operator=(const duplicate_entry &other166) { - timestamp = other156.timestamp; - task_code = other156.task_code; - raw_message = other156.raw_message; - cluster_id = other156.cluster_id; - verify_timetag = other156.verify_timetag; - __isset = other156.__isset; + timestamp = other166.timestamp; + task_code = other166.task_code; + raw_message = other166.raw_message; + cluster_id = other166.cluster_id; + verify_timetag = other166.verify_timetag; + __isset = other166.__isset; return *this; } -duplicate_request &duplicate_request::operator=(duplicate_request &&other157) +duplicate_entry &duplicate_entry::operator=(duplicate_entry &&other167) { - timestamp = std::move(other157.timestamp); - task_code = std::move(other157.task_code); - raw_message = std::move(other157.raw_message); - cluster_id = std::move(other157.cluster_id); - verify_timetag = std::move(other157.verify_timetag); - __isset = std::move(other157.__isset); + timestamp = std::move(other167.timestamp); + task_code = std::move(other167.task_code); + raw_message = std::move(other167.raw_message); + cluster_id = std::move(other167.cluster_id); + verify_timetag = std::move(other167.verify_timetag); + __isset = std::move(other167.__isset); return *this; } -void duplicate_request::printTo(std::ostream &out) const +void duplicate_entry::printTo(std::ostream &out) const { using ::apache::thrift::to_string; - out << "duplicate_request("; + out << "duplicate_entry("; out << "timestamp="; (__isset.timestamp ? (out << to_string(timestamp)) : (out << "<null>")); out << ", " @@ -5230,30 +5347,30 @@ void swap(duplicate_response &a, duplicate_response &b) swap(a.__isset, b.__isset); } -duplicate_response::duplicate_response(const duplicate_response &other158) +duplicate_response::duplicate_response(const duplicate_response &other168) { - error = other158.error; - error_hint = other158.error_hint; - __isset = other158.__isset; + error = other168.error; + error_hint = other168.error_hint; + __isset = other168.__isset; } -duplicate_response::duplicate_response(duplicate_response &&other159) +duplicate_response::duplicate_response(duplicate_response &&other169) { - error = std::move(other159.error); - error_hint = std::move(other159.error_hint); - __isset = std::move(other159.__isset); + error = std::move(other169.error); + error_hint = std::move(other169.error_hint); + __isset = std::move(other169.__isset); } -duplicate_response &duplicate_response::operator=(const duplicate_response &other160) +duplicate_response &duplicate_response::operator=(const duplicate_response &other170) { - error = other160.error; - error_hint = other160.error_hint; - __isset = other160.__isset; + error = other170.error; + error_hint = other170.error_hint; + __isset = other170.__isset; return *this; } -duplicate_response &duplicate_response::operator=(duplicate_response &&other161) +duplicate_response &duplicate_response::operator=(duplicate_response &&other171) { - error = std::move(other161.error); - error_hint = std::move(other161.error_hint); - __isset = std::move(other161.__isset); + error = std::move(other171.error); + error_hint = std::move(other171.error_hint); + __isset = std::move(other171.__isset); return *this; } void duplicate_response::printTo(std::ostream &out) const diff --git a/src/idl/rrdb.thrift b/src/idl/rrdb.thrift index 99316d2..3fc8280 100644 --- a/src/idl/rrdb.thrift +++ b/src/idl/rrdb.thrift @@ -298,6 +298,11 @@ struct scan_response struct duplicate_request { + 1: list<duplicate_entry> entries +} + +struct duplicate_entry +{ // The timestamp of this write. 1: optional i64 timestamp diff --git a/src/include/rrdb/rrdb_types.h b/src/include/rrdb/rrdb_types.h index 0e36678..7a75c28 100644 --- a/src/include/rrdb/rrdb_types.h +++ b/src/include/rrdb/rrdb_types.h @@ -123,6 +123,8 @@ class scan_response; class duplicate_request; +class duplicate_entry; + class duplicate_response; typedef struct _update_request__isset @@ -2074,7 +2076,53 @@ inline std::ostream &operator<<(std::ostream &out, const scan_response &obj) typedef struct _duplicate_request__isset { - _duplicate_request__isset() + _duplicate_request__isset() : entries(false) {} + bool entries : 1; +} _duplicate_request__isset; + +class duplicate_request +{ +public: + duplicate_request(const duplicate_request &); + duplicate_request(duplicate_request &&); + duplicate_request &operator=(const duplicate_request &); + duplicate_request &operator=(duplicate_request &&); + duplicate_request() {} + + virtual ~duplicate_request() throw(); + std::vector<duplicate_entry> entries; + + _duplicate_request__isset __isset; + + void __set_entries(const std::vector<duplicate_entry> &val); + + bool operator==(const duplicate_request &rhs) const + { + if (!(entries == rhs.entries)) + return false; + return true; + } + bool operator!=(const duplicate_request &rhs) const { return !(*this == rhs); } + + bool operator<(const duplicate_request &) const; + + uint32_t read(::apache::thrift::protocol::TProtocol *iprot); + uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; + + virtual void printTo(std::ostream &out) const; +}; + +void swap(duplicate_request &a, duplicate_request &b); + +inline std::ostream &operator<<(std::ostream &out, const duplicate_request &obj) +{ + obj.printTo(out); + return out; +} + +typedef struct _duplicate_entry__isset +{ + _duplicate_entry__isset() : timestamp(false), task_code(false), raw_message(false), @@ -2087,25 +2135,25 @@ typedef struct _duplicate_request__isset bool raw_message : 1; bool cluster_id : 1; bool verify_timetag : 1; -} _duplicate_request__isset; +} _duplicate_entry__isset; -class duplicate_request +class duplicate_entry { public: - duplicate_request(const duplicate_request &); - duplicate_request(duplicate_request &&); - duplicate_request &operator=(const duplicate_request &); - duplicate_request &operator=(duplicate_request &&); - duplicate_request() : timestamp(0), cluster_id(0), verify_timetag(0) {} + duplicate_entry(const duplicate_entry &); + duplicate_entry(duplicate_entry &&); + duplicate_entry &operator=(const duplicate_entry &); + duplicate_entry &operator=(duplicate_entry &&); + duplicate_entry() : timestamp(0), cluster_id(0), verify_timetag(0) {} - virtual ~duplicate_request() throw(); + virtual ~duplicate_entry() throw(); int64_t timestamp; ::dsn::task_code task_code; ::dsn::blob raw_message; int8_t cluster_id; bool verify_timetag; - _duplicate_request__isset __isset; + _duplicate_entry__isset __isset; void __set_timestamp(const int64_t val); @@ -2117,7 +2165,7 @@ public: void __set_verify_timetag(const bool val); - bool operator==(const duplicate_request &rhs) const + bool operator==(const duplicate_entry &rhs) const { if (__isset.timestamp != rhs.__isset.timestamp) return false; @@ -2141,9 +2189,9 @@ public: return false; return true; } - bool operator!=(const duplicate_request &rhs) const { return !(*this == rhs); } + bool operator!=(const duplicate_entry &rhs) const { return !(*this == rhs); } - bool operator<(const duplicate_request &) const; + bool operator<(const duplicate_entry &) const; uint32_t read(::apache::thrift::protocol::TProtocol *iprot); uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; @@ -2151,9 +2199,9 @@ public: virtual void printTo(std::ostream &out) const; }; -void swap(duplicate_request &a, duplicate_request &b); +void swap(duplicate_entry &a, duplicate_entry &b); -inline std::ostream &operator<<(std::ostream &out, const duplicate_request &obj) +inline std::ostream &operator<<(std::ostream &out, const duplicate_entry &obj) { obj.printTo(out); return out; diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 7244f4c..6fc4ecf 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -42,6 +42,11 @@ namespace replication { namespace pegasus { namespace server { +DSN_DEFINE_uint32("pegasus", + duplicate_log_batch_megabytes, + 4, + "send mutation log batch size per rpc"); + using namespace dsn::literals::chrono_literals; /*extern*/ uint64_t get_hash_from_request(dsn::task_code tc, const dsn::blob &data) @@ -146,9 +151,9 @@ void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash, // errors are acceptable. // TODO(wutao1): print the entire request for future debugging. if (dsn::rand::next_double01() <= 0.01) { - derror_replica("duplicate_rpc failed: {} [timestamp:{}]", + derror_replica("duplicate_rpc failed: {} [size:{}]", err == dsn::ERR_OK ? _client->get_error_string(perr) : err.to_string(), - rpc.request().timestamp); + rpc.request().entries.size()); } // duplicating an illegal write to server is unacceptable, fail fast. dassert_replica(perr != PERR_INVALID_ARGUMENT, rpc.response().error_hint); @@ -184,9 +189,14 @@ void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb { _total_shipped_size = 0; + auto batch_request = dsn::make_unique<dsn::apps::duplicate_request>(); + uint batch_bytes = 0; + int cur_count = 0; + for (auto mut : muts) { // mut: 0=timestamp, 1=rpc_code, 2=raw_message + cur_count++; dsn::task_code rpc_code = std::get<1>(mut); dsn::blob raw_message = std::get<2>(mut); auto dreq = dsn::make_unique<dsn::apps::duplicate_request>(); @@ -197,18 +207,29 @@ void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb // destinations. A DUPLICATE is meant to be targeting only one cluster. continue; } else { - dreq->__set_raw_message(raw_message); - dreq->__set_task_code(rpc_code); - dreq->__set_timestamp(std::get<0>(mut)); - dreq->__set_cluster_id(get_current_cluster_id()); + dsn::apps::duplicate_entry entry; + entry.__set_raw_message(raw_message); + entry.__set_task_code(rpc_code); + entry.__set_timestamp(std::get<0>(mut)); + entry.__set_cluster_id(get_current_cluster_id()); + batch_request->entries.emplace_back(std::move(entry)); + batch_bytes += raw_message.length(); } - uint64_t hash = get_hash_from_request(rpc_code, raw_message); - duplicate_rpc rpc(std::move(dreq), - dsn::apps::RPC_RRDB_RRDB_DUPLICATE, - 10_s, // TODO(wutao1): configurable timeout. - hash); - _inflights[hash].push_back(std::move(rpc)); + if (batch_bytes >= (FLAGS_duplicate_log_batch_megabytes << 20) || + cur_count == muts.size()) { + // since all the plog's mutations of replica belong to same gpid though the hash of + // mutation is different, use the last mutation of one batch to get and represents the + // current hash value, it will still send to remote correct replica + uint64_t hash = get_hash_from_request(rpc_code, raw_message); + duplicate_rpc rpc(std::move(batch_request), + dsn::apps::RPC_RRDB_RRDB_DUPLICATE, + 100_s, // TODO(wutao1): configurable timeout. + hash); + _inflights[hash].push_back(std::move(rpc)); + batch_request = dsn::make_unique<dsn::apps::duplicate_request>(); + batch_bytes = 0; + } } if (_inflights.empty()) { diff --git a/src/server/pegasus_mutation_duplicator.h b/src/server/pegasus_mutation_duplicator.h index 4d0b175..daeaf34 100644 --- a/src/server/pegasus_mutation_duplicator.h +++ b/src/server/pegasus_mutation_duplicator.h @@ -22,11 +22,13 @@ #include <dsn/dist/replication/mutation_duplicator.h> #include <dsn/dist/replication/replica_base.h> #include <rrdb/rrdb.code.definition.h> +#include <dsn/utility/flags.h> #include "client_lib/pegasus_client_factory_impl.h" namespace pegasus { namespace server { +DSN_DECLARE_uint32(duplicate_log_batch_megabytes); using namespace dsn::literals::chrono_literals; diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp index 48eaeb7..1a13753 100644 --- a/src/server/pegasus_write_service.cpp +++ b/src/server/pegasus_write_service.cpp @@ -308,69 +308,83 @@ void pegasus_write_service::clear_up_batch_states() } int pegasus_write_service::duplicate(int64_t decree, - const dsn::apps::duplicate_request &request, + const dsn::apps::duplicate_request &requests, dsn::apps::duplicate_response &resp) { // Verifies the cluster_id. - if (!dsn::replication::is_cluster_id_configured(request.cluster_id)) { - resp.__set_error(rocksdb::Status::kInvalidArgument); - resp.__set_error_hint("request cluster id is unconfigured"); - return empty_put(decree); - } - if (request.cluster_id == get_current_cluster_id()) { - resp.__set_error(rocksdb::Status::kInvalidArgument); - resp.__set_error_hint("self-duplicating"); - return empty_put(decree); - } - - _pfc_duplicate_qps->increment(); - auto cleanup = dsn::defer([this, &request]() { - uint64_t latency_ms = (dsn_now_us() - request.timestamp) / 1000; - if (latency_ms > _dup_lagging_write_threshold_ms) { - _pfc_dup_lagging_writes->increment(); + for (const auto &request : requests.entries) { + if (!dsn::replication::is_cluster_id_configured(request.cluster_id)) { + resp.__set_error(rocksdb::Status::kInvalidArgument); + resp.__set_error_hint("request cluster id is unconfigured"); + return empty_put(decree); } - _pfc_dup_time_lag->set(latency_ms); - }); - dsn::message_ex *write = dsn::from_blob_to_received_msg(request.task_code, request.raw_message); - bool is_delete = request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE || - request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE; - auto remote_timetag = generate_timetag(request.timestamp, request.cluster_id, is_delete); - auto ctx = db_write_context::create_duplicate(decree, remote_timetag, request.verify_timetag); - - if (request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) { - multi_put_rpc rpc(write); - resp.__set_error(_impl->multi_put(ctx, rpc.request(), rpc.response())); - return resp.error; - } - if (request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) { - multi_remove_rpc rpc(write); - resp.__set_error(_impl->multi_remove(ctx.decree, rpc.request(), rpc.response())); - return resp.error; - } - put_rpc put; - remove_rpc remove; - if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT || - request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) { - int err = 0; - if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT) { - put = put_rpc(write); - err = _impl->batch_put(ctx, put.request(), put.response()); + if (request.cluster_id == get_current_cluster_id()) { + resp.__set_error(rocksdb::Status::kInvalidArgument); + resp.__set_error_hint("self-duplicating"); + return empty_put(decree); } - if (request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) { - remove = remove_rpc(write); - err = _impl->batch_remove(ctx.decree, remove.request(), remove.response()); + + _pfc_duplicate_qps->increment(); + auto cleanup = dsn::defer([this, &request]() { + uint64_t latency_ms = (dsn_now_us() - request.timestamp) / 1000; + if (latency_ms > _dup_lagging_write_threshold_ms) { + _pfc_dup_lagging_writes->increment(); + } + _pfc_dup_time_lag->set(latency_ms); + }); + dsn::message_ex *write = + dsn::from_blob_to_received_msg(request.task_code, request.raw_message); + bool is_delete = request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE || + request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE; + auto remote_timetag = generate_timetag(request.timestamp, request.cluster_id, is_delete); + auto ctx = + db_write_context::create_duplicate(decree, remote_timetag, request.verify_timetag); + + if (request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) { + multi_put_rpc rpc(write); + resp.__set_error(_impl->multi_put(ctx, rpc.request(), rpc.response())); + if (resp.error != rocksdb::Status::kOk) { + return resp.error; + } + continue; } - if (!err) { - err = _impl->batch_commit(ctx.decree); - } else { - _impl->batch_abort(ctx.decree, err); + if (request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) { + multi_remove_rpc rpc(write); + resp.__set_error(_impl->multi_remove(ctx.decree, rpc.request(), rpc.response())); + if (resp.error != rocksdb::Status::kOk) { + return resp.error; + } + continue; } - resp.__set_error(err); - return resp.error; + put_rpc put; + remove_rpc remove; + if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT || + request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) { + int err = 0; + if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT) { + put = put_rpc(write); + err = _impl->batch_put(ctx, put.request(), put.response()); + } + if (request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) { + remove = remove_rpc(write); + err = _impl->batch_remove(ctx.decree, remove.request(), remove.response()); + } + if (!err) { + err = _impl->batch_commit(ctx.decree); + } else { + _impl->batch_abort(ctx.decree, err); + } + resp.__set_error(err); + if (resp.error != rocksdb::Status::kOk) { + return resp.error; + } + continue; + } + resp.__set_error(rocksdb::Status::kInvalidArgument); + resp.__set_error_hint(fmt::format("unrecognized task code {}", request.task_code)); + return empty_put(ctx.decree); } - resp.__set_error(rocksdb::Status::kInvalidArgument); - resp.__set_error_hint(fmt::format("unrecognized task code {}", request.task_code)); - return empty_put(ctx.decree); + return resp.error; } int pegasus_write_service::ingest_files(int64_t decree, diff --git a/src/server/test/pegasus_mutation_duplicator_test.cpp b/src/server/test/pegasus_mutation_duplicator_test.cpp index 2dab5a4..f61fe69 100644 --- a/src/server/test/pegasus_mutation_duplicator_test.cpp +++ b/src/server/test/pegasus_mutation_duplicator_test.cpp @@ -19,6 +19,7 @@ #include "server/pegasus_mutation_duplicator.h" #include "base/pegasus_rpc_types.h" +#include "base/value_schema_manager.h" #include "pegasus_server_test_base.h" #include <gtest/gtest.h> @@ -49,19 +50,27 @@ public: auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp"); duplicator->set_task_environment(&_env); + std::string sort_key; + for (int i = 0; i < 1000; i++) { + sort_key = fmt::format("{}_{}", sort_key, i); + } + mutation_tuple_set muts; - for (uint64_t i = 0; i < 100; i++) { + uint total_bytes = 0; + for (uint64_t i = 0; i < 4000; i++) { uint64_t ts = 200 + i; dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT; dsn::apps::update_request request; - pegasus::pegasus_generate_key(request.key, std::string("hash"), std::string("sort")); + pegasus::pegasus_generate_key(request.key, std::string("hash"), sort_key); dsn::message_ptr msg = dsn::from_thrift_request_to_received_message(request, dsn::apps::RPC_RRDB_RRDB_PUT); auto data = dsn::move_message_to_blob(msg.get()); muts.insert(std::make_tuple(ts, code, data)); + total_bytes += data.length(); } + auto batch_count = total_bytes / (FLAGS_duplicate_log_batch_megabytes << 20) + 1; size_t total_shipped_size = 0; auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator *>(duplicator.get()); @@ -69,14 +78,13 @@ public: { duplicator->duplicate(muts, [](size_t) {}); - size_t total_size = 100; - while (total_size > 0) { + while (batch_count > 0) { // ensure mutations having the same hash are sending sequentially. ASSERT_EQ(duplicator_impl->_inflights.size(), 1); ASSERT_EQ(duplicate_rpc::mail_box().size(), 1); - total_size--; - ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), total_size); + batch_count--; + ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), batch_count); auto rpc = duplicate_rpc::mail_box().back(); duplicate_rpc::mail_box().pop_back(); @@ -106,19 +114,27 @@ public: auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp"); duplicator->set_task_environment(&_env); + std::string sort_key; + for (int i = 0; i < 1000; i++) { + sort_key = fmt::format("{}_{}", sort_key, i); + } + mutation_tuple_set muts; - for (uint64_t i = 0; i < 10; i++) { + uint total_bytes = 0; + for (uint64_t i = 0; i < 4000; i++) { uint64_t ts = 200 + i; dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT; dsn::apps::update_request request; - pegasus::pegasus_generate_key(request.key, std::string("hash"), std::string("sort")); + pegasus::pegasus_generate_key(request.key, std::string("hash"), sort_key); dsn::message_ptr msg = dsn::from_thrift_request_to_received_message(request, dsn::apps::RPC_RRDB_RRDB_PUT); auto data = dsn::move_message_to_blob(msg.get()); muts.insert(std::make_tuple(ts, code, data)); + total_bytes += data.length(); } + auto batch_count = total_bytes / (FLAGS_duplicate_log_batch_megabytes << 20) + 1; auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator *>(duplicator.get()); RPC_MOCKING(duplicate_rpc) @@ -127,7 +143,7 @@ public: auto rpc = duplicate_rpc::mail_box().back(); duplicate_rpc::mail_box().pop_back(); - ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9); + ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), batch_count - 1); // failed duplicator_impl->on_duplicate_reply( @@ -139,7 +155,7 @@ public: // retry infinitely ASSERT_EQ(duplicator_impl->_inflights.size(), 1); ASSERT_EQ(duplicate_rpc::mail_box().size(), 1); - ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9); + ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), batch_count - 1); duplicate_rpc::mail_box().clear(); // with other error @@ -148,7 +164,7 @@ public: _tracker.wait_outstanding_tasks(); ASSERT_EQ(duplicator_impl->_inflights.size(), 1); ASSERT_EQ(duplicate_rpc::mail_box().size(), 1); - ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9); + ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), batch_count - 1); duplicate_rpc::mail_box().clear(); // with other error @@ -158,7 +174,7 @@ public: _tracker.wait_outstanding_tasks(); ASSERT_EQ(duplicator_impl->_inflights.size(), 1); ASSERT_EQ(duplicate_rpc::mail_box().size(), 1); - ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9); + ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), batch_count - 1); duplicate_rpc::mail_box().clear(); } } @@ -169,30 +185,38 @@ public: auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp"); duplicator->set_task_environment(&_env); - size_t total_size = 3000; + size_t total_size = 4000; + std::string sort_key; + for (int i = 0; i < 1000; i++) { + sort_key = fmt::format("{}_{}", sort_key, i); + } + mutation_tuple_set muts; + uint total_bytes = 0; for (uint64_t i = 0; i < total_size; i++) { uint64_t ts = 200 + i; dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT; dsn::apps::update_request request; pegasus::pegasus_generate_key( - request.key, std::string("hash") + std::to_string(i), std::string("sort")); + request.key, std::string("hash") + std::to_string(i), sort_key); dsn::message_ptr msg = dsn::from_thrift_request_to_received_message(request, code); auto data = dsn::move_message_to_blob(msg.get()); muts.insert(std::make_tuple(ts, code, data)); + total_bytes += data.length(); } + auto batch_count = total_bytes / (FLAGS_duplicate_log_batch_megabytes << 20) + 1; auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator *>(duplicator.get()); RPC_MOCKING(duplicate_rpc) { duplicator->duplicate(muts, [](size_t) {}); - // ensure each bucket has only 1 request and each request is + // ensure each bucket has only 1 batch request and each request is // isolated with others. - ASSERT_EQ(duplicator_impl->_inflights.size(), total_size); - ASSERT_EQ(duplicate_rpc::mail_box().size(), total_size); + ASSERT_EQ(duplicator_impl->_inflights.size(), batch_count); + ASSERT_EQ(duplicate_rpc::mail_box().size(), batch_count); for (const auto &ents : duplicator_impl->_inflights) { ASSERT_EQ(ents.second.size(), 0); } @@ -223,7 +247,9 @@ public: private: static uint64_t get_hash(const duplicate_rpc &rpc) { - return get_hash_from_request(rpc.request().task_code, rpc.request().raw_message); + auto size = rpc.request().entries.size(); + return get_hash_from_request(rpc.request().entries[size - 1].task_code, + rpc.request().entries[size - 1].raw_message); } }; @@ -322,9 +348,11 @@ TEST_F(pegasus_mutation_duplicator_test, duplicate_duplicate) // a duplicate from onebox2 dsn::apps::duplicate_request dup; - dup.cluster_id = 2; - dup.raw_message = data; - dup.timestamp = 200; + dsn::apps::duplicate_entry entry; + entry.cluster_id = 2; + entry.raw_message = data; + entry.timestamp = 200; + dup.entries.emplace_back(entry); msg = dsn::from_thrift_request_to_received_message(dup, dsn::apps::RPC_RRDB_RRDB_DUPLICATE); data = dsn::move_message_to_blob(msg.get()); diff --git a/src/server/test/pegasus_write_service_test.cpp b/src/server/test/pegasus_write_service_test.cpp index 839da27..9bd3d7f 100644 --- a/src/server/test/pegasus_write_service_test.cpp +++ b/src/server/test/pegasus_write_service_test.cpp @@ -220,8 +220,9 @@ TEST_F(pegasus_write_service_test, duplicate_not_batched) } dsn::apps::duplicate_request duplicate; - duplicate.timestamp = 1000; - duplicate.cluster_id = 2; + dsn::apps::duplicate_entry entry; + entry.timestamp = 1000; + entry.cluster_id = 2; dsn::apps::duplicate_response resp; { @@ -233,9 +234,9 @@ TEST_F(pegasus_write_service_test, duplicate_not_batched) } dsn::message_ptr mput_msg = pegasus::create_multi_put_request(mput); - duplicate.task_code = dsn::apps::RPC_RRDB_RRDB_MULTI_PUT; - duplicate.raw_message = dsn::move_message_to_blob(mput_msg.get()); - + entry.task_code = dsn::apps::RPC_RRDB_RRDB_MULTI_PUT; + entry.raw_message = dsn::move_message_to_blob(mput_msg.get()); + duplicate.entries.emplace_back(entry); _write_svc->duplicate(1, duplicate, resp); ASSERT_EQ(resp.error, 0); } @@ -248,8 +249,8 @@ TEST_F(pegasus_write_service_test, duplicate_not_batched) } dsn::message_ptr mremove_msg = pegasus::create_multi_remove_request(mremove); - duplicate.task_code = dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE; - duplicate.raw_message = dsn::move_message_to_blob(mremove_msg.get()); + entry.task_code = dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE; + entry.raw_message = dsn::move_message_to_blob(mremove_msg.get()); _write_svc->duplicate(1, duplicate, resp); ASSERT_EQ(resp.error, 0); @@ -270,8 +271,9 @@ TEST_F(pegasus_write_service_test, duplicate_batched) { dsn::apps::duplicate_request duplicate; - duplicate.timestamp = 1000; - duplicate.cluster_id = 2; + dsn::apps::duplicate_entry entry; + entry.timestamp = 1000; + entry.cluster_id = 2; dsn::apps::duplicate_response resp; for (int i = 0; i < kv_num; i++) { @@ -280,8 +282,9 @@ TEST_F(pegasus_write_service_test, duplicate_batched) request.value.assign(value[i].data(), 0, value[i].size()); dsn::message_ptr msg_ptr = pegasus::create_put_request(request); - duplicate.raw_message = dsn::move_message_to_blob(msg_ptr.get()); - duplicate.task_code = dsn::apps::RPC_RRDB_RRDB_PUT; + entry.raw_message = dsn::move_message_to_blob(msg_ptr.get()); + entry.task_code = dsn::apps::RPC_RRDB_RRDB_PUT; + duplicate.entries.emplace_back(entry); _write_svc->duplicate(1, duplicate, resp); ASSERT_EQ(resp.error, 0); } @@ -296,8 +299,10 @@ TEST_F(pegasus_write_service_test, illegal_duplicate_request) // cluster=13 is from nowhere dsn::apps::duplicate_request duplicate; - duplicate.cluster_id = 13; - duplicate.timestamp = 10; + dsn::apps::duplicate_entry entry; + entry.timestamp = 1000; + entry.cluster_id = 2; + duplicate.entries.emplace_back(entry); dsn::apps::duplicate_response resp; dsn::apps::update_request request; @@ -305,8 +310,8 @@ TEST_F(pegasus_write_service_test, illegal_duplicate_request) request.value.assign(value.data(), 0, value.size()); dsn::message_ptr msg_ptr = pegasus::create_put_request(request); // auto release memory - duplicate.raw_message = dsn::move_message_to_blob(msg_ptr.get()); - duplicate.task_code = dsn::apps::RPC_RRDB_RRDB_PUT; + entry.raw_message = dsn::move_message_to_blob(msg_ptr.get()); + entry.task_code = dsn::apps::RPC_RRDB_RRDB_PUT; _write_svc->duplicate(1, duplicate, resp); ASSERT_EQ(resp.error, rocksdb::Status::kInvalidArgument); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
