acelyc111 commented on code in PR #1908:
URL:
https://github.com/apache/incubator-pegasus/pull/1908#discussion_r1524133408
##########
src/common/duplication_common.cpp:
##########
@@ -37,6 +37,15 @@ DSN_DEFINE_uint32(replication,
"send mutation log batch bytes size per rpc");
DSN_TAG_VARIABLE(duplicate_log_batch_bytes, FT_MUTABLE);
+DSN_DEFINE_bool(
+ replication,
+ duplication_unsafe_allow_non_idempotent,
+ false,
+ "Turn on the switch so that the cluster can accept non-idempotent writes
and forward these "
+ "writes via duplication "
+ "Note that this switch may cause data inconsistency between clusters. So
we say it is unsafe ");
Review Comment:
```suggestion
"writes via duplication. Note that this switch may cause data
inconsistency between "
"clusters. So we say it is unsafe.");
```
##########
src/common/duplication_common.h:
##########
@@ -31,6 +31,7 @@
#include "utils/fmt_utils.h"
DSN_DECLARE_uint32(duplicate_log_batch_bytes);
+DSN_DECLARE_bool(duplication_unsafe_allow_non_idempotent);
Review Comment:
Move these declarations to the places where actually use them, don't put
them in header files.
##########
src/server/pegasus_mutation_duplicator.cpp:
##########
@@ -205,6 +230,58 @@ void
pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash,
}
}
+void
pegasus_mutation_duplicator::type_force_send_non_idempotent_if_need(duplicate_rpc
&rpc)
+{
+ if (!FLAGS_duplication_unsafe_allow_non_idempotent) {
+ return;
+ }
+
+ // there maybe more than one mutation in one dup rpc
+ for (auto entry : rpc.request().entries) {
+ // not a non idempotent request
+ if (!_non_idempotent_code.count(entry.task_code)) {
+ continue;
+ }
+
+ METRIC_VAR_INCREMENT(dup_retry_non_idempotent_duplicate_request);
+ dsn::message_ex *write =
dsn::from_blob_to_received_msg(entry.task_code, entry.raw_message);
+
+ if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) {
+ incr_rpc raw_rpc(write);
+
+ LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_INCR has been
retried when doing "
+ "duplication,"
+ "key is [{}]",
+ raw_rpc.request().key);
+ continue;
+ }
+
+ if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) {
+ check_and_set_rpc raw_rpc(write);
+
+ LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_SET has
been retried "
+ "when doing duplication,"
+ "hash key [{}], check sort key [{}],"
+ "set sort key [{}]",
+ raw_rpc.request().hash_key,
+ raw_rpc.request().check_sort_key,
+ raw_rpc.request().set_sort_key);
+ continue;
+ }
+
+ if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
+ check_and_mutate_rpc raw_rpc(write);
+
+ LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_MUTATE
has been "
+ "retried when doing duplication,"
+ "hash key is [{}] , sort key is [{}] .",
Review Comment:
```suggestion
"retried when doing duplication, hash key is '{}',
sort key is '{}'.",
```
##########
src/server/pegasus_mutation_duplicator.cpp:
##########
@@ -205,6 +230,58 @@ void
pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash,
}
}
+void
pegasus_mutation_duplicator::type_force_send_non_idempotent_if_need(duplicate_rpc
&rpc)
+{
+ if (!FLAGS_duplication_unsafe_allow_non_idempotent) {
+ return;
+ }
+
+ // there maybe more than one mutation in one dup rpc
+ for (auto entry : rpc.request().entries) {
+ // not a non idempotent request
+ if (!_non_idempotent_code.count(entry.task_code)) {
+ continue;
+ }
+
+ METRIC_VAR_INCREMENT(dup_retry_non_idempotent_duplicate_request);
+ dsn::message_ex *write =
dsn::from_blob_to_received_msg(entry.task_code, entry.raw_message);
+
+ if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) {
+ incr_rpc raw_rpc(write);
+
+ LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_INCR has been
retried when doing "
+ "duplication,"
+ "key is [{}]",
+ raw_rpc.request().key);
+ continue;
+ }
+
+ if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) {
+ check_and_set_rpc raw_rpc(write);
+
+ LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_SET has
been retried "
+ "when doing duplication,"
+ "hash key [{}], check sort key [{}],"
+ "set sort key [{}]",
Review Comment:
```suggestion
"when doing duplication, hash key '{}', check sort key
'{}', set sort "
"key '{}'",
```
##########
src/server/pegasus_mutation_duplicator.h:
##########
@@ -89,8 +94,14 @@ class pegasus_mutation_duplicator : public
dsn::replication::mutation_duplicator
size_t _total_shipped_size{0};
+ const std::set<dsn::task_code> _non_idempotent_code = {
Review Comment:
Try to reuse it in some other places, e.g.
src/server/pegasus_write_service.cpp
##########
src/server/pegasus_mutation_duplicator.cpp:
##########
@@ -205,6 +230,58 @@ void
pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash,
}
}
+void
pegasus_mutation_duplicator::type_force_send_non_idempotent_if_need(duplicate_rpc
&rpc)
+{
+ if (!FLAGS_duplication_unsafe_allow_non_idempotent) {
+ return;
+ }
+
+ // there maybe more than one mutation in one dup rpc
+ for (auto entry : rpc.request().entries) {
+ // not a non idempotent request
+ if (!_non_idempotent_code.count(entry.task_code)) {
+ continue;
+ }
+
+ METRIC_VAR_INCREMENT(dup_retry_non_idempotent_duplicate_request);
+ dsn::message_ex *write =
dsn::from_blob_to_received_msg(entry.task_code, entry.raw_message);
+
+ if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) {
+ incr_rpc raw_rpc(write);
+
+ LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_INCR has been
retried when doing "
+ "duplication,"
+ "key is [{}]",
Review Comment:
```suggestion
"duplication, key is '{}'",
```
##########
src/server/pegasus_mutation_duplicator.h:
##########
@@ -89,8 +94,14 @@ class pegasus_mutation_duplicator : public
dsn::replication::mutation_duplicator
size_t _total_shipped_size{0};
+ const std::set<dsn::task_code> _non_idempotent_code = {
Review Comment:
```suggestion
const std::set<dsn::task_code> _non_idempotent_codes = {
```
##########
src/server/pegasus_write_service.cpp:
##########
@@ -415,6 +421,44 @@ int pegasus_write_service::duplicate(int64_t decree,
}
continue;
}
+
+ // Parse non-idempotent writes via duplication
+ if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR ||
+ request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET ||
+ request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
+ // receive non-idempotent request from master cluster via
duplication when
+ // FLAG_duplication_unsafe_allow_non_idempotent set as true.
+ // This metric greater than zero means that there is already the
possibility of
+ // inconsistency between clusters
Review Comment:
Move these comments to the
dup_unsafe_received_non_idempotent_duplicate_request metric description, then
users know what does it mean from http API, rather than finding out the
comments in code.
##########
src/server/pegasus_mutation_duplicator.cpp:
##########
@@ -189,6 +211,9 @@ void
pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash,
// retry this rpc
_inflights[hash].push_front(rpc);
_env.schedule([hash, cb, this]() { send(hash, cb); }, 1_s);
Review Comment:
Will the non-idempotent request be retried twice here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]