This is an automated email from the ASF dual-hosted git repository. jiashuo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit 9633af8d80142f21c06750abfada8ee0091ad518 Author: Jiashuo <[email protected]> AuthorDate: Mon Mar 28 15:05:14 2022 +0800 fix(dup_enhancement#22): change batch send config by using rdsn config value (#930) --- src/server/pegasus_mutation_duplicator.cpp | 16 ++++---------- src/server/pegasus_mutation_duplicator.h | 2 +- .../test/pegasus_mutation_duplicator_test.cpp | 25 +++++++++++++++++----- src/shell/main.cpp | 1 + 4 files changed, 26 insertions(+), 18 deletions(-) diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 6fc4ecf..c2513a9 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -23,7 +23,6 @@ #include <dsn/cpp/message_utils.h> #include <dsn/utility/chrono_literals.h> -#include <dsn/dist/replication/duplication_common.h> #include <rrdb/rrdb.client.h> namespace dsn { @@ -42,11 +41,6 @@ namespace replication { namespace pegasus { namespace server { -DSN_DEFINE_uint32("pegasus", - duplicate_log_batch_megabytes, - 4, - "send mutation log batch size per rpc"); - using namespace dsn::literals::chrono_literals; /*extern*/ uint64_t get_hash_from_request(dsn::task_code tc, const dsn::blob &data) @@ -190,13 +184,11 @@ void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb _total_shipped_size = 0; auto batch_request = dsn::make_unique<dsn::apps::duplicate_request>(); + uint batch_count = 0; uint batch_bytes = 0; - int cur_count = 0; - for (auto mut : muts) { // mut: 0=timestamp, 1=rpc_code, 2=raw_message - - cur_count++; + batch_count++; dsn::task_code rpc_code = std::get<1>(mut); dsn::blob raw_message = std::get<2>(mut); auto dreq = dsn::make_unique<dsn::apps::duplicate_request>(); @@ -216,8 +208,8 @@ void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb batch_bytes += raw_message.length(); } - if (batch_bytes >= (FLAGS_duplicate_log_batch_megabytes << 20) || - cur_count == muts.size()) { + if (batch_count == muts.size() || + batch_bytes >= dsn::replication::FLAGS_duplicate_log_batch_bytes) { // since all the plog's mutations of replica belong to same gpid though the hash of // mutation is different, use the last mutation of one batch to get and represents the // current hash value, it will still send to remote correct replica diff --git a/src/server/pegasus_mutation_duplicator.h b/src/server/pegasus_mutation_duplicator.h index daeaf34..2d04cbd 100644 --- a/src/server/pegasus_mutation_duplicator.h +++ b/src/server/pegasus_mutation_duplicator.h @@ -23,12 +23,12 @@ #include <dsn/dist/replication/replica_base.h> #include <rrdb/rrdb.code.definition.h> #include <dsn/utility/flags.h> +#include <dsn/dist/replication/duplication_common.h> #include "client_lib/pegasus_client_factory_impl.h" namespace pegasus { namespace server { -DSN_DECLARE_uint32(duplicate_log_batch_megabytes); using namespace dsn::literals::chrono_literals; diff --git a/src/server/test/pegasus_mutation_duplicator_test.cpp b/src/server/test/pegasus_mutation_duplicator_test.cpp index f61fe69..15239db 100644 --- a/src/server/test/pegasus_mutation_duplicator_test.cpp +++ b/src/server/test/pegasus_mutation_duplicator_test.cpp @@ -57,7 +57,8 @@ public: mutation_tuple_set muts; uint total_bytes = 0; - for (uint64_t i = 0; i < 4000; i++) { + uint batch_count = 0; + for (uint64_t i = 0; i < 400; i++) { uint64_t ts = 200 + i; dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT; @@ -69,8 +70,12 @@ public: muts.insert(std::make_tuple(ts, code, data)); total_bytes += data.length(); + + if (total_bytes >= FLAGS_duplicate_log_batch_bytes) { + batch_count++; + total_bytes = 0; + } } - auto batch_count = total_bytes / (FLAGS_duplicate_log_batch_megabytes << 20) + 1; size_t total_shipped_size = 0; auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator *>(duplicator.get()); @@ -121,7 +126,8 @@ public: mutation_tuple_set muts; uint total_bytes = 0; - for (uint64_t i = 0; i < 4000; i++) { + uint batch_count = 0; + for (uint64_t i = 0; i < 400; i++) { uint64_t ts = 200 + i; dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT; @@ -133,8 +139,12 @@ public: muts.insert(std::make_tuple(ts, code, data)); total_bytes += data.length(); + + if (total_bytes >= FLAGS_duplicate_log_batch_bytes) { + batch_count++; + total_bytes = 0; + } } - auto batch_count = total_bytes / (FLAGS_duplicate_log_batch_megabytes << 20) + 1; auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator *>(duplicator.get()); RPC_MOCKING(duplicate_rpc) @@ -193,6 +203,7 @@ public: mutation_tuple_set muts; uint total_bytes = 0; + uint batch_count = 0; for (uint64_t i = 0; i < total_size; i++) { uint64_t ts = 200 + i; dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT; @@ -205,8 +216,12 @@ public: muts.insert(std::make_tuple(ts, code, data)); total_bytes += data.length(); + + if (total_bytes >= FLAGS_duplicate_log_batch_bytes) { + batch_count++; + total_bytes = 0; + } } - auto batch_count = total_bytes / (FLAGS_duplicate_log_batch_megabytes << 20) + 1; auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator *>(duplicator.get()); RPC_MOCKING(duplicate_rpc) diff --git a/src/shell/main.cpp b/src/shell/main.cpp index 308d54c..897e24a 100644 --- a/src/shell/main.cpp +++ b/src/shell/main.cpp @@ -449,6 +449,7 @@ static command_executor commands[] = { "[-s|--skip_prompt] [-o|--output file_name]", ddd_diagnose, }, + // todo(jiashuo1) [-f|--freezed] is Deprecated, it will be removed later {"add_dup", "add duplication", "<app_name> <remote_cluster_name> [-f|--freezed]", add_dup}, {"query_dup", "query duplication info", "<app_name> [-d|--detail]", query_dup}, {"remove_dup", "remove duplication", "<app_name> <dup_id>", remove_dup}, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
