acelyc111 commented on code in PR #1908:
URL: 
https://github.com/apache/incubator-pegasus/pull/1908#discussion_r1524133408


##########
src/common/duplication_common.cpp:
##########
@@ -37,6 +37,15 @@ DSN_DEFINE_uint32(replication,
                   "send mutation log batch bytes size per rpc");
 DSN_TAG_VARIABLE(duplicate_log_batch_bytes, FT_MUTABLE);
 
+DSN_DEFINE_bool(
+    replication,
+    duplication_unsafe_allow_non_idempotent,
+    false,
+    "Turn on the switch so that the cluster can accept non-idempotent writes 
and forward these "
+    "writes via duplication "
+    "Note that this switch may cause data inconsistency between clusters. So 
we say it is unsafe ");

Review Comment:
   ```suggestion
       "writes via duplication. Note that this switch may cause data 
inconsistency between "
       "clusters. So we say it is unsafe.");
   ```



##########
src/common/duplication_common.h:
##########
@@ -31,6 +31,7 @@
 #include "utils/fmt_utils.h"
 
 DSN_DECLARE_uint32(duplicate_log_batch_bytes);
+DSN_DECLARE_bool(duplication_unsafe_allow_non_idempotent);

Review Comment:
   Move these declarations to the places where actually use them, don't put 
them in header files.



##########
src/server/pegasus_mutation_duplicator.cpp:
##########
@@ -205,6 +230,58 @@ void 
pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash,
     }
 }
 
+void 
pegasus_mutation_duplicator::type_force_send_non_idempotent_if_need(duplicate_rpc
 &rpc)
+{
+    if (!FLAGS_duplication_unsafe_allow_non_idempotent) {
+        return;
+    }
+
+    // there maybe more than one mutation in one dup rpc
+    for (auto entry : rpc.request().entries) {
+        // not a non idempotent request
+        if (!_non_idempotent_code.count(entry.task_code)) {
+            continue;
+        }
+
+        METRIC_VAR_INCREMENT(dup_retry_non_idempotent_duplicate_request);
+        dsn::message_ex *write = 
dsn::from_blob_to_received_msg(entry.task_code, entry.raw_message);
+
+        if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) {
+            incr_rpc raw_rpc(write);
+
+            LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_INCR has been 
retried when doing "
+                      "duplication,"
+                      "key is [{}]",
+                      raw_rpc.request().key);
+            continue;
+        }
+
+        if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) {
+            check_and_set_rpc raw_rpc(write);
+
+            LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_SET has 
been retried "
+                      "when doing duplication,"
+                      "hash key [{}], check sort key [{}],"
+                      "set sort key [{}]",
+                      raw_rpc.request().hash_key,
+                      raw_rpc.request().check_sort_key,
+                      raw_rpc.request().set_sort_key);
+            continue;
+        }
+
+        if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
+            check_and_mutate_rpc raw_rpc(write);
+
+            LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_MUTATE 
has been "
+                      "retried when doing duplication,"
+                      "hash key is [{}] , sort key is [{}] .",

Review Comment:
   ```suggestion
                         "retried when doing duplication, hash key is '{}', 
sort key is '{}'.",
   ```



##########
src/server/pegasus_mutation_duplicator.cpp:
##########
@@ -205,6 +230,58 @@ void 
pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash,
     }
 }
 
+void 
pegasus_mutation_duplicator::type_force_send_non_idempotent_if_need(duplicate_rpc
 &rpc)
+{
+    if (!FLAGS_duplication_unsafe_allow_non_idempotent) {
+        return;
+    }
+
+    // there maybe more than one mutation in one dup rpc
+    for (auto entry : rpc.request().entries) {
+        // not a non idempotent request
+        if (!_non_idempotent_code.count(entry.task_code)) {
+            continue;
+        }
+
+        METRIC_VAR_INCREMENT(dup_retry_non_idempotent_duplicate_request);
+        dsn::message_ex *write = 
dsn::from_blob_to_received_msg(entry.task_code, entry.raw_message);
+
+        if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) {
+            incr_rpc raw_rpc(write);
+
+            LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_INCR has been 
retried when doing "
+                      "duplication,"
+                      "key is [{}]",
+                      raw_rpc.request().key);
+            continue;
+        }
+
+        if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) {
+            check_and_set_rpc raw_rpc(write);
+
+            LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_SET has 
been retried "
+                      "when doing duplication,"
+                      "hash key [{}], check sort key [{}],"
+                      "set sort key [{}]",

Review Comment:
   ```suggestion
                         "when doing duplication, hash key '{}', check sort key 
'{}', set sort "
                         "key '{}'",
   ```



##########
src/server/pegasus_mutation_duplicator.h:
##########
@@ -89,8 +94,14 @@ class pegasus_mutation_duplicator : public 
dsn::replication::mutation_duplicator
 
     size_t _total_shipped_size{0};
 
+    const std::set<dsn::task_code> _non_idempotent_code = {

Review Comment:
   Try to reuse it in some other places, e.g. 
src/server/pegasus_write_service.cpp



##########
src/server/pegasus_mutation_duplicator.cpp:
##########
@@ -205,6 +230,58 @@ void 
pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash,
     }
 }
 
+void 
pegasus_mutation_duplicator::type_force_send_non_idempotent_if_need(duplicate_rpc
 &rpc)
+{
+    if (!FLAGS_duplication_unsafe_allow_non_idempotent) {
+        return;
+    }
+
+    // there maybe more than one mutation in one dup rpc
+    for (auto entry : rpc.request().entries) {
+        // not a non idempotent request
+        if (!_non_idempotent_code.count(entry.task_code)) {
+            continue;
+        }
+
+        METRIC_VAR_INCREMENT(dup_retry_non_idempotent_duplicate_request);
+        dsn::message_ex *write = 
dsn::from_blob_to_received_msg(entry.task_code, entry.raw_message);
+
+        if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) {
+            incr_rpc raw_rpc(write);
+
+            LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_INCR has been 
retried when doing "
+                      "duplication,"
+                      "key is [{}]",

Review Comment:
   ```suggestion
                         "duplication, key is '{}'",
   ```



##########
src/server/pegasus_mutation_duplicator.h:
##########
@@ -89,8 +94,14 @@ class pegasus_mutation_duplicator : public 
dsn::replication::mutation_duplicator
 
     size_t _total_shipped_size{0};
 
+    const std::set<dsn::task_code> _non_idempotent_code = {

Review Comment:
   ```suggestion
       const std::set<dsn::task_code> _non_idempotent_codes = {
   ```



##########
src/server/pegasus_write_service.cpp:
##########
@@ -415,6 +421,44 @@ int pegasus_write_service::duplicate(int64_t decree,
             }
             continue;
         }
+
+        // Parse non-idempotent writes via duplication
+        if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR ||
+            request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET ||
+            request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
+            // receive non-idempotent request from master cluster via 
duplication when
+            // FLAG_duplication_unsafe_allow_non_idempotent set as true.
+            // This metric greater than zero means that there is already the 
possibility of
+            // inconsistency between clusters

Review Comment:
   Move these comments to the 
dup_unsafe_received_non_idempotent_duplicate_request metric description, then 
users know what does it mean from http API, rather than finding out the 
comments in code.



##########
src/server/pegasus_mutation_duplicator.cpp:
##########
@@ -189,6 +211,9 @@ void 
pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash,
             // retry this rpc
             _inflights[hash].push_front(rpc);
             _env.schedule([hash, cb, this]() { send(hash, cb); }, 1_s);

Review Comment:
   Will the non-idempotent request be retried twice here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to