This is an automated email from the ASF dual-hosted git repository.

jiashuo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit 9633af8d80142f21c06750abfada8ee0091ad518
Author: Jiashuo <[email protected]>
AuthorDate: Mon Mar 28 15:05:14 2022 +0800

    fix(dup_enhancement#22): change batch send config by using rdsn config 
value (#930)
---
 src/server/pegasus_mutation_duplicator.cpp         | 16 ++++----------
 src/server/pegasus_mutation_duplicator.h           |  2 +-
 .../test/pegasus_mutation_duplicator_test.cpp      | 25 +++++++++++++++++-----
 src/shell/main.cpp                                 |  1 +
 4 files changed, 26 insertions(+), 18 deletions(-)

diff --git a/src/server/pegasus_mutation_duplicator.cpp 
b/src/server/pegasus_mutation_duplicator.cpp
index 6fc4ecf..c2513a9 100644
--- a/src/server/pegasus_mutation_duplicator.cpp
+++ b/src/server/pegasus_mutation_duplicator.cpp
@@ -23,7 +23,6 @@
 
 #include <dsn/cpp/message_utils.h>
 #include <dsn/utility/chrono_literals.h>
-#include <dsn/dist/replication/duplication_common.h>
 #include <rrdb/rrdb.client.h>
 
 namespace dsn {
@@ -42,11 +41,6 @@ namespace replication {
 namespace pegasus {
 namespace server {
 
-DSN_DEFINE_uint32("pegasus",
-                  duplicate_log_batch_megabytes,
-                  4,
-                  "send mutation log batch size per rpc");
-
 using namespace dsn::literals::chrono_literals;
 
 /*extern*/ uint64_t get_hash_from_request(dsn::task_code tc, const dsn::blob 
&data)
@@ -190,13 +184,11 @@ void 
pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb
     _total_shipped_size = 0;
 
     auto batch_request = dsn::make_unique<dsn::apps::duplicate_request>();
+    uint batch_count = 0;
     uint batch_bytes = 0;
-    int cur_count = 0;
-
     for (auto mut : muts) {
         // mut: 0=timestamp, 1=rpc_code, 2=raw_message
-
-        cur_count++;
+        batch_count++;
         dsn::task_code rpc_code = std::get<1>(mut);
         dsn::blob raw_message = std::get<2>(mut);
         auto dreq = dsn::make_unique<dsn::apps::duplicate_request>();
@@ -216,8 +208,8 @@ void 
pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb
             batch_bytes += raw_message.length();
         }
 
-        if (batch_bytes >= (FLAGS_duplicate_log_batch_megabytes << 20) ||
-            cur_count == muts.size()) {
+        if (batch_count == muts.size() ||
+            batch_bytes >= dsn::replication::FLAGS_duplicate_log_batch_bytes) {
             // since all the plog's mutations of replica belong to same gpid 
though the hash of
             // mutation is different, use the last mutation of one batch to 
get and represents the
             // current hash value, it will still send to remote correct replica
diff --git a/src/server/pegasus_mutation_duplicator.h 
b/src/server/pegasus_mutation_duplicator.h
index daeaf34..2d04cbd 100644
--- a/src/server/pegasus_mutation_duplicator.h
+++ b/src/server/pegasus_mutation_duplicator.h
@@ -23,12 +23,12 @@
 #include <dsn/dist/replication/replica_base.h>
 #include <rrdb/rrdb.code.definition.h>
 #include <dsn/utility/flags.h>
+#include <dsn/dist/replication/duplication_common.h>
 
 #include "client_lib/pegasus_client_factory_impl.h"
 
 namespace pegasus {
 namespace server {
-DSN_DECLARE_uint32(duplicate_log_batch_megabytes);
 
 using namespace dsn::literals::chrono_literals;
 
diff --git a/src/server/test/pegasus_mutation_duplicator_test.cpp 
b/src/server/test/pegasus_mutation_duplicator_test.cpp
index f61fe69..15239db 100644
--- a/src/server/test/pegasus_mutation_duplicator_test.cpp
+++ b/src/server/test/pegasus_mutation_duplicator_test.cpp
@@ -57,7 +57,8 @@ public:
 
         mutation_tuple_set muts;
         uint total_bytes = 0;
-        for (uint64_t i = 0; i < 4000; i++) {
+        uint batch_count = 0;
+        for (uint64_t i = 0; i < 400; i++) {
             uint64_t ts = 200 + i;
             dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT;
 
@@ -69,8 +70,12 @@ public:
 
             muts.insert(std::make_tuple(ts, code, data));
             total_bytes += data.length();
+
+            if (total_bytes >= FLAGS_duplicate_log_batch_bytes) {
+                batch_count++;
+                total_bytes = 0;
+            }
         }
-        auto batch_count = total_bytes / (FLAGS_duplicate_log_batch_megabytes 
<< 20) + 1;
 
         size_t total_shipped_size = 0;
         auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator 
*>(duplicator.get());
@@ -121,7 +126,8 @@ public:
 
         mutation_tuple_set muts;
         uint total_bytes = 0;
-        for (uint64_t i = 0; i < 4000; i++) {
+        uint batch_count = 0;
+        for (uint64_t i = 0; i < 400; i++) {
             uint64_t ts = 200 + i;
             dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT;
 
@@ -133,8 +139,12 @@ public:
 
             muts.insert(std::make_tuple(ts, code, data));
             total_bytes += data.length();
+
+            if (total_bytes >= FLAGS_duplicate_log_batch_bytes) {
+                batch_count++;
+                total_bytes = 0;
+            }
         }
-        auto batch_count = total_bytes / (FLAGS_duplicate_log_batch_megabytes 
<< 20) + 1;
 
         auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator 
*>(duplicator.get());
         RPC_MOCKING(duplicate_rpc)
@@ -193,6 +203,7 @@ public:
 
         mutation_tuple_set muts;
         uint total_bytes = 0;
+        uint batch_count = 0;
         for (uint64_t i = 0; i < total_size; i++) {
             uint64_t ts = 200 + i;
             dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT;
@@ -205,8 +216,12 @@ public:
 
             muts.insert(std::make_tuple(ts, code, data));
             total_bytes += data.length();
+
+            if (total_bytes >= FLAGS_duplicate_log_batch_bytes) {
+                batch_count++;
+                total_bytes = 0;
+            }
         }
-        auto batch_count = total_bytes / (FLAGS_duplicate_log_batch_megabytes 
<< 20) + 1;
 
         auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator 
*>(duplicator.get());
         RPC_MOCKING(duplicate_rpc)
diff --git a/src/shell/main.cpp b/src/shell/main.cpp
index 308d54c..897e24a 100644
--- a/src/shell/main.cpp
+++ b/src/shell/main.cpp
@@ -449,6 +449,7 @@ static command_executor commands[] = {
         "[-s|--skip_prompt] [-o|--output file_name]",
         ddd_diagnose,
     },
+    // todo(jiashuo1) [-f|--freezed] is Deprecated, it will be removed later
     {"add_dup", "add duplication", "<app_name> <remote_cluster_name> 
[-f|--freezed]", add_dup},
     {"query_dup", "query duplication info", "<app_name> [-d|--detail]", 
query_dup},
     {"remove_dup", "remove duplication", "<app_name> <dup_id>", remove_dup},

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to