This is an automated email from the ASF dual-hosted git repository.

zhaoliwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 8039277  fix: catch exception if unmarshall the rpc request encounters 
error (#790)
8039277 is described below

commit 80392779b27bff5562b23388f2a66aa6c50da38e
Author: zhao liwei <[email protected]>
AuthorDate: Wed Jul 28 15:47:27 2021 +0800

    fix: catch exception if unmarshall the rpc request encounters error (#790)
---
 src/server/pegasus_server_write.cpp | 65 +++++++++++++++++++++++++------------
 src/server/pegasus_server_write.h   |  2 ++
 2 files changed, 46 insertions(+), 21 deletions(-)

diff --git a/src/server/pegasus_server_write.cpp 
b/src/server/pegasus_server_write.cpp
index 9f4fc4a..ef41b2b 100644
--- a/src/server/pegasus_server_write.cpp
+++ b/src/server/pegasus_server_write.cpp
@@ -33,6 +33,13 @@ namespace server {
 pegasus_server_write::pegasus_server_write(pegasus_server_impl *server, bool 
verbose_log)
     : replica_base(server), _write_svc(new pegasus_write_service(server)), 
_verbose_log(verbose_log)
 {
+    char name[256];
+    snprintf(name, 255, "recent_corrupt_write_count@%s", 
get_gpid().to_string());
+    _pfc_recent_corrupt_write_count.init_app_counter("app.pegasus",
+                                                     name,
+                                                     
COUNTER_TYPE_VOLATILE_NUMBER,
+                                                     "statistic the recent 
corrupt write count");
+
     init_non_batch_write_handlers();
 }
 
@@ -51,11 +58,20 @@ int 
pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
         return _write_svc->empty_put(_decree);
     }
 
-    auto iter = _non_batch_write_handlers.find(requests[0]->rpc_code());
-    if (iter != _non_batch_write_handlers.end()) {
-        dassert_f(count == 1, "count = {}", count);
-        return iter->second(requests[0]);
+    try {
+        auto iter = _non_batch_write_handlers.find(requests[0]->rpc_code());
+        if (iter != _non_batch_write_handlers.end()) {
+            dassert_f(count == 1, "count = {}", count);
+            return iter->second(requests[0]);
+        }
+    } catch (TTransportException ex) {
+        _pfc_recent_corrupt_write_count->increment();
+        derror_replica("pegasus not batch write handler failed, from = {}, 
exception = {}",
+                       requests[0]->header->from_address.to_string(),
+                       ex.what());
+        return 0;
     }
+
     return on_batched_writes(requests, count);
 }
 
@@ -73,23 +89,30 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex 
**requests, int coun
             // Make sure all writes are batched even if they are failed,
             // since we need to record the total qps and rpc latencies,
             // and respond for all RPCs regardless of their result.
-
             int local_err = 0;
-            dsn::task_code rpc_code(requests[i]->rpc_code());
-            if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
-                auto rpc = put_rpc::auto_reply(requests[i]);
-                local_err = on_single_put_in_batch(rpc);
-                _put_rpc_batch.emplace_back(std::move(rpc));
-            } else if (rpc_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
-                auto rpc = remove_rpc::auto_reply(requests[i]);
-                local_err = on_single_remove_in_batch(rpc);
-                _remove_rpc_batch.emplace_back(std::move(rpc));
-            } else {
-                if (_non_batch_write_handlers.find(rpc_code) != 
_non_batch_write_handlers.end()) {
-                    dfatal_f("rpc code not allow batch: {}", 
rpc_code.to_string());
+            try {
+                dsn::task_code rpc_code(requests[i]->rpc_code());
+                if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
+                    auto rpc = put_rpc::auto_reply(requests[i]);
+                    local_err = on_single_put_in_batch(rpc);
+                    _put_rpc_batch.emplace_back(std::move(rpc));
+                } else if (rpc_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
+                    auto rpc = remove_rpc::auto_reply(requests[i]);
+                    local_err = on_single_remove_in_batch(rpc);
+                    _remove_rpc_batch.emplace_back(std::move(rpc));
                 } else {
-                    dfatal_f("rpc code not handled: {}", rpc_code.to_string());
+                    if (_non_batch_write_handlers.find(rpc_code) !=
+                        _non_batch_write_handlers.end()) {
+                        dfatal_f("rpc code not allow batch: {}", 
rpc_code.to_string());
+                    } else {
+                        dfatal_f("rpc code not handled: {}", 
rpc_code.to_string());
+                    }
                 }
+            } catch (TTransportException ex) {
+                _pfc_recent_corrupt_write_count->increment();
+                derror_replica("pegasus batch writes handler failed, from = 
{}, exception = {}",
+                               requests[i]->header->from_address.to_string(),
+                               ex.what());
             }
 
             if (!err && local_err) {
@@ -97,10 +120,10 @@ int 
pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun
             }
         }
 
-        if (err == 0) {
-            err = _write_svc->batch_commit(_decree);
+        if (dsn_unlikely(err != 0 || _put_rpc_batch.size() + 
_remove_rpc_batch.size() == 0)) {
+            _write_svc->batch_abort(_decree, err == 0 ? -1 : err);
         } else {
-            _write_svc->batch_abort(_decree, err);
+            err = _write_svc->batch_commit(_decree);
         }
     }
 
diff --git a/src/server/pegasus_server_write.h 
b/src/server/pegasus_server_write.h
index c75d469..c73fc0d 100644
--- a/src/server/pegasus_server_write.h
+++ b/src/server/pegasus_server_write.h
@@ -89,6 +89,8 @@ private:
 
     typedef std::map<dsn::task_code, std::function<int(dsn::message_ex *)>> 
non_batch_writes_map;
     non_batch_writes_map _non_batch_write_handlers;
+
+    ::dsn::perf_counter_wrapper _pfc_recent_corrupt_write_count;
 };
 
 } // namespace server

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to