This is an automated email from the ASF dual-hosted git repository.
zhaoliwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 8039277 fix: catch exception if unmarshall the rpc request encounters
error (#790)
8039277 is described below
commit 80392779b27bff5562b23388f2a66aa6c50da38e
Author: zhao liwei <[email protected]>
AuthorDate: Wed Jul 28 15:47:27 2021 +0800
fix: catch exception if unmarshall the rpc request encounters error (#790)
---
src/server/pegasus_server_write.cpp | 65 +++++++++++++++++++++++++------------
src/server/pegasus_server_write.h | 2 ++
2 files changed, 46 insertions(+), 21 deletions(-)
diff --git a/src/server/pegasus_server_write.cpp
b/src/server/pegasus_server_write.cpp
index 9f4fc4a..ef41b2b 100644
--- a/src/server/pegasus_server_write.cpp
+++ b/src/server/pegasus_server_write.cpp
@@ -33,6 +33,13 @@ namespace server {
pegasus_server_write::pegasus_server_write(pegasus_server_impl *server, bool
verbose_log)
: replica_base(server), _write_svc(new pegasus_write_service(server)),
_verbose_log(verbose_log)
{
+ char name[256];
+ snprintf(name, 255, "recent_corrupt_write_count@%s",
get_gpid().to_string());
+ _pfc_recent_corrupt_write_count.init_app_counter("app.pegasus",
+ name,
+
COUNTER_TYPE_VOLATILE_NUMBER,
+ "statistic the recent
corrupt write count");
+
init_non_batch_write_handlers();
}
@@ -51,11 +58,20 @@ int
pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
return _write_svc->empty_put(_decree);
}
- auto iter = _non_batch_write_handlers.find(requests[0]->rpc_code());
- if (iter != _non_batch_write_handlers.end()) {
- dassert_f(count == 1, "count = {}", count);
- return iter->second(requests[0]);
+ try {
+ auto iter = _non_batch_write_handlers.find(requests[0]->rpc_code());
+ if (iter != _non_batch_write_handlers.end()) {
+ dassert_f(count == 1, "count = {}", count);
+ return iter->second(requests[0]);
+ }
+ } catch (TTransportException ex) {
+ _pfc_recent_corrupt_write_count->increment();
+ derror_replica("pegasus not batch write handler failed, from = {},
exception = {}",
+ requests[0]->header->from_address.to_string(),
+ ex.what());
+ return 0;
}
+
return on_batched_writes(requests, count);
}
@@ -73,23 +89,30 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex
**requests, int coun
// Make sure all writes are batched even if they are failed,
// since we need to record the total qps and rpc latencies,
// and respond for all RPCs regardless of their result.
-
int local_err = 0;
- dsn::task_code rpc_code(requests[i]->rpc_code());
- if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
- auto rpc = put_rpc::auto_reply(requests[i]);
- local_err = on_single_put_in_batch(rpc);
- _put_rpc_batch.emplace_back(std::move(rpc));
- } else if (rpc_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
- auto rpc = remove_rpc::auto_reply(requests[i]);
- local_err = on_single_remove_in_batch(rpc);
- _remove_rpc_batch.emplace_back(std::move(rpc));
- } else {
- if (_non_batch_write_handlers.find(rpc_code) !=
_non_batch_write_handlers.end()) {
- dfatal_f("rpc code not allow batch: {}",
rpc_code.to_string());
+ try {
+ dsn::task_code rpc_code(requests[i]->rpc_code());
+ if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
+ auto rpc = put_rpc::auto_reply(requests[i]);
+ local_err = on_single_put_in_batch(rpc);
+ _put_rpc_batch.emplace_back(std::move(rpc));
+ } else if (rpc_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
+ auto rpc = remove_rpc::auto_reply(requests[i]);
+ local_err = on_single_remove_in_batch(rpc);
+ _remove_rpc_batch.emplace_back(std::move(rpc));
} else {
- dfatal_f("rpc code not handled: {}", rpc_code.to_string());
+ if (_non_batch_write_handlers.find(rpc_code) !=
+ _non_batch_write_handlers.end()) {
+ dfatal_f("rpc code not allow batch: {}",
rpc_code.to_string());
+ } else {
+ dfatal_f("rpc code not handled: {}",
rpc_code.to_string());
+ }
}
+ } catch (TTransportException ex) {
+ _pfc_recent_corrupt_write_count->increment();
+ derror_replica("pegasus batch writes handler failed, from =
{}, exception = {}",
+ requests[i]->header->from_address.to_string(),
+ ex.what());
}
if (!err && local_err) {
@@ -97,10 +120,10 @@ int
pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun
}
}
- if (err == 0) {
- err = _write_svc->batch_commit(_decree);
+ if (dsn_unlikely(err != 0 || _put_rpc_batch.size() +
_remove_rpc_batch.size() == 0)) {
+ _write_svc->batch_abort(_decree, err == 0 ? -1 : err);
} else {
- _write_svc->batch_abort(_decree, err);
+ err = _write_svc->batch_commit(_decree);
}
}
diff --git a/src/server/pegasus_server_write.h
b/src/server/pegasus_server_write.h
index c75d469..c73fc0d 100644
--- a/src/server/pegasus_server_write.h
+++ b/src/server/pegasus_server_write.h
@@ -89,6 +89,8 @@ private:
typedef std::map<dsn::task_code, std::function<int(dsn::message_ex *)>>
non_batch_writes_map;
non_batch_writes_map _non_batch_write_handlers;
+
+ ::dsn::perf_counter_wrapper _pfc_recent_corrupt_write_count;
};
} // namespace server
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]