This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 5835e501b60cc3af75782eac7bb892a5f69f7d00 Author: Pxl <[email protected]> AuthorDate: Wed Aug 2 22:31:33 2023 +0800 [Chore](brpc) display pool name when try offer failed (#22514) --- be/src/service/internal_service.cpp | 172 ++++++++++------------------------- be/src/util/priority_thread_pool.hpp | 2 + 2 files changed, 51 insertions(+), 123 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 528331ee12..1f62a5bc9a 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -168,6 +168,23 @@ private: google::protobuf::Closure* _done = nullptr; }; +template <typename T> +concept CanCancel = requires(T* response) { response->mutable_status(); }; + +template <CanCancel T> +void offer_failed(T* response, google::protobuf::Closure* done, const std::string& pool_name) { + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool, pool=" + + pool_name); +} + +template <typename T> +void offer_failed(T* response, google::protobuf::Closure* done, const std::string& pool_name) { + brpc::ClosureGuard closure_guard(done); + LOG(WARNING) << "fail to offer request to the work pool, pool=" << pool_name; +} + PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env) : _exec_env(exec_env), _heavy_work_pool(config::brpc_heavy_work_pool_threads != -1 @@ -255,10 +272,7 @@ void PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c st.to_protobuf(response->mutable_status()); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, _light_work_pool.get_name()); } } @@ -270,10 +284,7 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c _exec_plan_fragment_in_pthread(controller, request, response, done); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, _light_work_pool.get_name()); } } @@ -309,10 +320,7 @@ void PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcContr _exec_plan_fragment_in_pthread(controller, request, response, done); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, _light_work_pool.get_name()); } } @@ -328,10 +336,7 @@ void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcControl st.to_protobuf(result->mutable_status()); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - result->mutable_status()->set_status_code(TStatusCode::CANCELLED); - result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(result, done, _light_work_pool.get_name()); } } @@ -343,10 +348,7 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll _tablet_writer_add_block(controller, request, response, done); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, _heavy_work_pool.get_name()); } } @@ -367,10 +369,7 @@ void PInternalServiceImpl::tablet_writer_add_block_by_http( } }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, _heavy_work_pool.get_name()); } } @@ -399,10 +398,7 @@ void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl response->set_wait_execution_time_us(wait_execution_time_ns / NANOS_PER_MICRO); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, _heavy_work_pool.get_name()); } } @@ -423,8 +419,7 @@ void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* } }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); + offer_failed(response, done, _light_work_pool.get_name()); } } @@ -501,10 +496,7 @@ void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* st.to_protobuf(result->mutable_status()); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - result->mutable_status()->set_status_code(TStatusCode::CANCELLED); - result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(result, done, _light_work_pool.get_name()); } } @@ -517,10 +509,7 @@ void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* controlle _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - result->mutable_status()->set_status_code(TStatusCode::CANCELLED); - result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(result, done, _heavy_work_pool.get_name()); } } @@ -624,10 +613,7 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c st.to_protobuf(result->mutable_status()); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - result->mutable_status()->set_status_code(TStatusCode::CANCELLED); - result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(result, done, _heavy_work_pool.get_name()); } } @@ -654,10 +640,7 @@ void PInternalServiceImpl::tablet_fetch_data(google::protobuf::RpcController* co st.to_protobuf(response->mutable_status()); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, _light_work_pool.get_name()); } } @@ -669,10 +652,7 @@ void PInternalServiceImpl::get_column_ids_by_tablet_ids(google::protobuf::RpcCon _get_column_ids_by_tablet_ids(controller, request, response, done); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, _light_work_pool.get_name()); } } @@ -788,10 +768,7 @@ void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, Status::OK().to_protobuf(response->mutable_status()); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, _heavy_work_pool.get_name()); } } @@ -803,9 +780,7 @@ void PInternalServiceImpl::update_cache(google::protobuf::RpcController* control _exec_env->result_cache()->update(request, response); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->set_status(PCacheStatus::CANCELED); + offer_failed(response, done, _light_work_pool.get_name()); } } @@ -817,9 +792,7 @@ void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controll _exec_env->result_cache()->fetch(request, result); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - result->set_status(PCacheStatus::CANCELED); + offer_failed(result, done, _heavy_work_pool.get_name()); } } @@ -831,9 +804,7 @@ void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* controll _exec_env->result_cache()->clear(request, response); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->set_status(PCacheStatus::CANCELED); + offer_failed(response, done, _light_work_pool.get_name()); } } @@ -852,10 +823,7 @@ void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* contr st.to_protobuf(response->mutable_status()); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, _light_work_pool.get_name()); } } @@ -876,10 +844,7 @@ void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* contr st.to_protobuf(response->mutable_status()); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, _light_work_pool.get_name()); } } @@ -900,10 +865,7 @@ void PInternalServiceImpl::apply_filterv2(::google::protobuf::RpcController* con st.to_protobuf(response->mutable_status()); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, _light_work_pool.get_name()); } } @@ -936,10 +898,7 @@ void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller } }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, _heavy_work_pool.get_name()); } } @@ -962,10 +921,7 @@ void PInternalServiceImpl::commit(google::protobuf::RpcController* controller, } }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, _light_work_pool.get_name()); } } @@ -987,10 +943,7 @@ void PInternalServiceImpl::rollback(google::protobuf::RpcController* controller, } }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, _light_work_pool.get_name()); } } @@ -1008,10 +961,7 @@ void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* c st.to_protobuf(response->mutable_status()); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, _light_work_pool.get_name()); } } @@ -1038,10 +988,7 @@ void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* contr _transmit_block(controller, request, response, done, Status::OK()); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, pool.get_name()); } } @@ -1059,10 +1006,7 @@ void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcControlle _transmit_block(controller, new_request, response, new_done, st); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, _heavy_work_pool.get_name()); } } @@ -1128,10 +1072,7 @@ void PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* co } }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, _light_work_pool.get_name()); } } @@ -1169,10 +1110,7 @@ void PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* co } }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, _light_work_pool.get_name()); } } @@ -1188,10 +1126,7 @@ void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* controlle response->mutable_status()->set_status_code(0); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, _light_work_pool.get_name()); } } @@ -1409,10 +1344,7 @@ void PInternalServiceImpl::request_slave_tablet_pull_rowset( rowset_meta->tablet_id(), node_id, true); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, _heavy_work_pool.get_name()); } Status::OK().to_protobuf(response->mutable_status()); } @@ -1483,10 +1415,7 @@ void PInternalServiceImpl::response_slave_tablet_pull_rowset( Status::OK().to_protobuf(response->mutable_status()); }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, _heavy_work_pool.get_name()); } } @@ -1659,10 +1588,7 @@ void PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro LOG(INFO) << "multiget_data finished, cost(us):" << watch.elapsed_time() / 1000; }); if (!ret) { - LOG(WARNING) << "fail to offer request to the work pool"; - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(TStatusCode::CANCELLED); - response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + offer_failed(response, done, _heavy_work_pool.get_name()); } } diff --git a/be/src/util/priority_thread_pool.hpp b/be/src/util/priority_thread_pool.hpp index 9e5e87b2cd..73685bcf7c 100644 --- a/be/src/util/priority_thread_pool.hpp +++ b/be/src/util/priority_thread_pool.hpp @@ -122,6 +122,8 @@ public: join(); } + std::string get_name() const { return _name; } + protected: virtual bool is_shutdown() { return _shutdown; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
