acelyc111 commented on code in PR #1908:
URL:
https://github.com/apache/incubator-pegasus/pull/1908#discussion_r1508543840
##########
src/common/duplication_common.cpp:
##########
@@ -37,6 +37,13 @@ DSN_DEFINE_uint32(replication,
"send mutation log batch bytes size per rpc");
DSN_TAG_VARIABLE(duplicate_log_batch_bytes, FT_MUTABLE);
+DSN_DEFINE_bool("replication",
+ force_send_no_idempotent_when_duplication,
+ false,
+ "receive client idempotent write requests and send them to
backup cluster when "
Review Comment:
non-idempotent
##########
src/server/pegasus_write_service.cpp:
##########
@@ -121,6 +121,11 @@ METRIC_DEFINE_counter(replica,
dsn::metric_unit::kRequests,
"The number of DUPLICATE requests");
+METRIC_DEFINE_counter(replica,
+ force_receive_no_idempotent_duplicate_qps,
Review Comment:
It's just a counter, it needs a duration to calculate the qps, so don't add
qps in the name. You can naming it like "xxx_requests"
##########
src/common/duplication_common.h:
##########
@@ -31,6 +31,7 @@
#include "utils/fmt_utils.h"
DSN_DECLARE_uint32(duplicate_log_batch_bytes);
+DSN_DECLARE_bool(force_send_no_idempotent_when_duplication);
Review Comment:
Is it necessary to add it here?
##########
src/server/pegasus_mutation_duplicator.cpp:
##########
@@ -205,6 +230,75 @@ void
pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash,
}
}
+void
pegasus_mutation_duplicator::type_force_send_no_idempotent_if_need(duplicate_rpc
&rpc)
+{
+
+ // there maybe more than one mutation in one dup rpc
+ if (FLAGS_force_send_no_idempotent_when_duplication) {
+ for (auto entry : rpc.request().entries) {
+ if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR ||
+ entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET ||
+ entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
+
Review Comment:
Add these codes in a set, then:
```
if (entry.task_code not in set) {
continue;
}
METRIC_VAR_INCREMENT(dup_retry_no_idempotent_duplicate_qps);
switch (entry.task_code) {
case x:
...
case y:
...
}
```
##########
src/server/pegasus_mutation_duplicator.cpp:
##########
@@ -205,6 +230,75 @@ void
pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash,
}
}
+void
pegasus_mutation_duplicator::type_force_send_no_idempotent_if_need(duplicate_rpc
&rpc)
+{
+
+ // there maybe more than one mutation in one dup rpc
+ if (FLAGS_force_send_no_idempotent_when_duplication) {
+ for (auto entry : rpc.request().entries) {
+ if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR ||
+ entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET ||
+ entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
+
+ METRIC_VAR_INCREMENT(dup_retry_no_idempotent_duplicate_qps);
+
+ dsn::message_ex *write =
+ dsn::from_blob_to_received_msg(entry.task_code,
entry.raw_message);
+
+ if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) {
+ incr_rpc raw_rpc(write);
+ absl::string_view
unmarshall_key(raw_rpc.request().key.data(),
Review Comment:
Is it possible to use `raw_rpc.request().key` directly in LOG_DEBUG?
##########
src/server/pegasus_mutation_duplicator.cpp:
##########
@@ -205,6 +230,75 @@ void
pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash,
}
}
+void
pegasus_mutation_duplicator::type_force_send_no_idempotent_if_need(duplicate_rpc
&rpc)
+{
+
+ // there maybe more than one mutation in one dup rpc
+ if (FLAGS_force_send_no_idempotent_when_duplication) {
Review Comment:
Reducce the indents.
```
if (!FLAGS_force_send_no_idempotent_when_duplication) {
return;
}
for (...) {
...
}
```
##########
src/server/pegasus_write_service.cpp:
##########
@@ -415,6 +421,40 @@ int pegasus_write_service::duplicate(int64_t decree,
}
continue;
}
+
+ if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR ||
+ request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET ||
+ request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
+ // receive no idempotent request from master cluster via
duplication
+ METRIC_VAR_INCREMENT(force_receive_no_idempotent_duplicate_qps);
+
+ if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) {
+ incr_rpc rpc(write);
+ resp.__set_error(_impl->incr(ctx.decree, rpc.request(),
rpc.response()));
Review Comment:
Add some comments to describe what's you aim here.
##########
src/common/duplication_common.cpp:
##########
@@ -37,6 +37,13 @@ DSN_DEFINE_uint32(replication,
"send mutation log batch bytes size per rpc");
DSN_TAG_VARIABLE(duplicate_log_batch_bytes, FT_MUTABLE);
+DSN_DEFINE_bool("replication",
Review Comment:
```suggestion
DSN_DEFINE_bool(replication,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]