This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3d0d7a427b [Chore](brpc) display pool name when try offer failed
(#22514)
3d0d7a427b is described below
commit 3d0d7a427b8a70376da80914314974a0d8f54545
Author: Pxl <[email protected]>
AuthorDate: Wed Aug 2 22:31:33 2023 +0800
[Chore](brpc) display pool name when try offer failed (#22514)
---
be/src/service/internal_service.cpp | 172 ++++++++++-------------------------
be/src/util/priority_thread_pool.hpp | 2 +
2 files changed, 51 insertions(+), 123 deletions(-)
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 528331ee12..1f62a5bc9a 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -168,6 +168,23 @@ private:
google::protobuf::Closure* _done = nullptr;
};
+template <typename T>
+concept CanCancel = requires(T* response) { response->mutable_status(); };
+
+template <CanCancel T>
+void offer_failed(T* response, google::protobuf::Closure* done, const
std::string& pool_name) {
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to the
work pool, pool=" +
+ pool_name);
+}
+
+template <typename T>
+void offer_failed(T* response, google::protobuf::Closure* done, const
std::string& pool_name) {
+ brpc::ClosureGuard closure_guard(done);
+ LOG(WARNING) << "fail to offer request to the work pool, pool=" <<
pool_name;
+}
+
PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env)
: _exec_env(exec_env),
_heavy_work_pool(config::brpc_heavy_work_pool_threads != -1
@@ -255,10 +272,7 @@ void
PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c
st.to_protobuf(response->mutable_status());
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, _light_work_pool.get_name());
}
}
@@ -270,10 +284,7 @@ void
PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c
_exec_plan_fragment_in_pthread(controller, request, response, done);
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, _light_work_pool.get_name());
}
}
@@ -309,10 +320,7 @@ void
PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcContr
_exec_plan_fragment_in_pthread(controller, request, response, done);
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, _light_work_pool.get_name());
}
}
@@ -328,10 +336,7 @@ void
PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcControl
st.to_protobuf(result->mutable_status());
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- result->mutable_status()->add_error_msgs("fail to offer request to the
work pool");
+ offer_failed(result, done, _light_work_pool.get_name());
}
}
@@ -343,10 +348,7 @@ void
PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll
_tablet_writer_add_block(controller, request, response, done);
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, _heavy_work_pool.get_name());
}
}
@@ -367,10 +369,7 @@ void PInternalServiceImpl::tablet_writer_add_block_by_http(
}
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, _heavy_work_pool.get_name());
}
}
@@ -399,10 +398,7 @@ void
PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl
response->set_wait_execution_time_us(wait_execution_time_ns /
NANOS_PER_MICRO);
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, _heavy_work_pool.get_name());
}
}
@@ -423,8 +419,7 @@ void
PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController*
}
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
+ offer_failed(response, done, _light_work_pool.get_name());
}
}
@@ -501,10 +496,7 @@ void
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController*
st.to_protobuf(result->mutable_status());
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- result->mutable_status()->add_error_msgs("fail to offer request to the
work pool");
+ offer_failed(result, done, _light_work_pool.get_name());
}
}
@@ -517,10 +509,7 @@ void
PInternalServiceImpl::fetch_data(google::protobuf::RpcController* controlle
_exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- result->mutable_status()->add_error_msgs("fail to offer request to the
work pool");
+ offer_failed(result, done, _heavy_work_pool.get_name());
}
}
@@ -624,10 +613,7 @@ void
PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
st.to_protobuf(result->mutable_status());
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- result->mutable_status()->add_error_msgs("fail to offer request to the
work pool");
+ offer_failed(result, done, _heavy_work_pool.get_name());
}
}
@@ -654,10 +640,7 @@ void
PInternalServiceImpl::tablet_fetch_data(google::protobuf::RpcController* co
st.to_protobuf(response->mutable_status());
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, _light_work_pool.get_name());
}
}
@@ -669,10 +652,7 @@ void
PInternalServiceImpl::get_column_ids_by_tablet_ids(google::protobuf::RpcCon
_get_column_ids_by_tablet_ids(controller, request, response, done);
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, _light_work_pool.get_name());
}
}
@@ -788,10 +768,7 @@ void
PInternalServiceImpl::get_info(google::protobuf::RpcController* controller,
Status::OK().to_protobuf(response->mutable_status());
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, _heavy_work_pool.get_name());
}
}
@@ -803,9 +780,7 @@ void
PInternalServiceImpl::update_cache(google::protobuf::RpcController* control
_exec_env->result_cache()->update(request, response);
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->set_status(PCacheStatus::CANCELED);
+ offer_failed(response, done, _light_work_pool.get_name());
}
}
@@ -817,9 +792,7 @@ void
PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controll
_exec_env->result_cache()->fetch(request, result);
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- result->set_status(PCacheStatus::CANCELED);
+ offer_failed(result, done, _heavy_work_pool.get_name());
}
}
@@ -831,9 +804,7 @@ void
PInternalServiceImpl::clear_cache(google::protobuf::RpcController* controll
_exec_env->result_cache()->clear(request, response);
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->set_status(PCacheStatus::CANCELED);
+ offer_failed(response, done, _light_work_pool.get_name());
}
}
@@ -852,10 +823,7 @@ void
PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* contr
st.to_protobuf(response->mutable_status());
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, _light_work_pool.get_name());
}
}
@@ -876,10 +844,7 @@ void
PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* contr
st.to_protobuf(response->mutable_status());
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, _light_work_pool.get_name());
}
}
@@ -900,10 +865,7 @@ void
PInternalServiceImpl::apply_filterv2(::google::protobuf::RpcController* con
st.to_protobuf(response->mutable_status());
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, _light_work_pool.get_name());
}
}
@@ -936,10 +898,7 @@ void
PInternalServiceImpl::send_data(google::protobuf::RpcController* controller
}
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, _heavy_work_pool.get_name());
}
}
@@ -962,10 +921,7 @@ void
PInternalServiceImpl::commit(google::protobuf::RpcController* controller,
}
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, _light_work_pool.get_name());
}
}
@@ -987,10 +943,7 @@ void
PInternalServiceImpl::rollback(google::protobuf::RpcController* controller,
}
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, _light_work_pool.get_name());
}
}
@@ -1008,10 +961,7 @@ void
PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* c
st.to_protobuf(response->mutable_status());
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, _light_work_pool.get_name());
}
}
@@ -1038,10 +988,7 @@ void
PInternalServiceImpl::transmit_block(google::protobuf::RpcController* contr
_transmit_block(controller, request, response, done, Status::OK());
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, pool.get_name());
}
}
@@ -1059,10 +1006,7 @@ void
PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcControlle
_transmit_block(controller, new_request, response, new_done, st);
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, _heavy_work_pool.get_name());
}
}
@@ -1128,10 +1072,7 @@ void
PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* co
}
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, _light_work_pool.get_name());
}
}
@@ -1169,10 +1110,7 @@ void
PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* co
}
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, _light_work_pool.get_name());
}
}
@@ -1188,10 +1126,7 @@ void
PInternalServiceImpl::hand_shake(google::protobuf::RpcController* controlle
response->mutable_status()->set_status_code(0);
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, _light_work_pool.get_name());
}
}
@@ -1409,10 +1344,7 @@ void
PInternalServiceImpl::request_slave_tablet_pull_rowset(
rowset_meta->tablet_id(), node_id, true);
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, _heavy_work_pool.get_name());
}
Status::OK().to_protobuf(response->mutable_status());
}
@@ -1483,10 +1415,7 @@ void
PInternalServiceImpl::response_slave_tablet_pull_rowset(
Status::OK().to_protobuf(response->mutable_status());
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, _heavy_work_pool.get_name());
}
}
@@ -1659,10 +1588,7 @@ void
PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro
LOG(INFO) << "multiget_data finished, cost(us):" <<
watch.elapsed_time() / 1000;
});
if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ offer_failed(response, done, _heavy_work_pool.get_name());
}
}
diff --git a/be/src/util/priority_thread_pool.hpp
b/be/src/util/priority_thread_pool.hpp
index 9e5e87b2cd..73685bcf7c 100644
--- a/be/src/util/priority_thread_pool.hpp
+++ b/be/src/util/priority_thread_pool.hpp
@@ -122,6 +122,8 @@ public:
join();
}
+ std::string get_name() const { return _name; }
+
protected:
virtual bool is_shutdown() { return _shutdown; }
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]