This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 8534abc47976969600b09e19d74bee5393f61d8d Author: chenlinzhong <[email protected]> AuthorDate: Wed Feb 22 14:15:47 2023 +0800 [Improvement](brpc) Using a thread pool for RPC service avoiding std::mutex block brpc::bthread (#16639) mainly include: - brpc service adds two types of thread pools. The number of "light" and "heavy" thread pools is different Classify the interfaces of be. Those related to data transmission are classified as heavy interfaces and others as light interfaces - Add some monitoring to the thread pool, including the queue size and the number of active threads. Use these - indicators to guide the configuration of the number of threads --- be/src/common/config.h | 14 +- be/src/service/internal_service.cpp | 922 +++++++++++++-------- be/src/service/internal_service.h | 9 +- be/src/util/doris_metrics.h | 10 + be/src/util/priority_thread_pool.hpp | 6 +- .../maint-monitor/monitor-metrics/metrics.md | 8 + gensrc/proto/internal_service.proto | 1 + 7 files changed, 623 insertions(+), 347 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index f141b1b8eb..49e8f09983 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -35,7 +35,8 @@ CONF_Int32(be_port, "9060"); // port for brpc CONF_Int32(brpc_port, "8060"); -// the number of bthreads for brpc, the default value is set to -1, which means the number of bthreads is #cpu-cores +// the number of bthreads for brpc, the default value is set to -1, +// which means the number of bthreads is #cpu-cores CONF_Int32(brpc_num_threads, "-1"); // port to brpc server for single replica load @@ -399,8 +400,15 @@ CONF_Int32(single_replica_load_download_num_workers, "64"); CONF_Int64(load_data_reserve_hours, "4"); // log error log will be removed after this time CONF_mInt64(load_error_log_reserve_hours, "48"); -CONF_Int32(number_tablet_writer_threads, "16"); -CONF_Int32(number_slave_replica_download_threads, "64"); + +// be brpc interface is classified into two categories: light and heavy +// each category has diffrent thread number +// threads to handle heavy api interface, such as transmit_data/transmit_block etc +CONF_Int32(brpc_heavy_work_pool_threads, "192"); +// threads to handle light api interface, such as exec_plan_fragment_prepare/exec_plan_fragment_start +CONF_Int32(brpc_light_work_pool_threads, "32"); +CONF_Int32(brpc_heavy_work_pool_max_queue_size, "10240"); +CONF_Int32(brpc_light_work_pool_max_queue_size, "10240"); // The maximum amount of data that can be processed by a stream load CONF_mInt64(streaming_load_max_mb, "10240"); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 381782af85..5922fe6419 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -60,7 +60,15 @@ namespace doris { const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3; -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(add_batch_task_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_active_threads, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_active_threads, MetricUnit::NOUNIT); + +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_max_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_max_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_max_threads, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_max_threads, MetricUnit::NOUNIT); bthread_key_t btls_key; @@ -94,16 +102,42 @@ private: PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env) : _exec_env(exec_env), - _tablet_worker_pool(config::number_tablet_writer_threads, 10240, "tablet_writer"), - _slave_replica_worker_pool(config::number_slave_replica_download_threads, 10240, - "replica_download") { - REGISTER_HOOK_METRIC(add_batch_task_queue_size, - [this]() { return _tablet_worker_pool.get_queue_size(); }); + _heavy_work_pool(config::brpc_heavy_work_pool_threads, + config::brpc_heavy_work_pool_max_queue_size, "brpc_heavy"), + _light_work_pool(config::brpc_light_work_pool_threads, + config::brpc_light_work_pool_max_queue_size, "brpc_light") { + REGISTER_HOOK_METRIC(heavy_work_pool_queue_size, + [this]() { return _heavy_work_pool.get_queue_size(); }); + REGISTER_HOOK_METRIC(light_work_pool_queue_size, + [this]() { return _light_work_pool.get_queue_size(); }); + REGISTER_HOOK_METRIC(heavy_work_active_threads, + [this]() { return _heavy_work_pool.get_active_threads(); }); + REGISTER_HOOK_METRIC(light_work_active_threads, + [this]() { return _light_work_pool.get_active_threads(); }); + + REGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size, + []() { return config::brpc_heavy_work_pool_max_queue_size; }); + REGISTER_HOOK_METRIC(light_work_pool_max_queue_size, + []() { return config::brpc_light_work_pool_max_queue_size; }); + REGISTER_HOOK_METRIC(heavy_work_max_threads, + []() { return config::brpc_heavy_work_pool_threads; }); + REGISTER_HOOK_METRIC(light_work_max_threads, + []() { return config::brpc_light_work_pool_threads; }); + CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter)); } PInternalServiceImpl::~PInternalServiceImpl() { - DEREGISTER_HOOK_METRIC(add_batch_task_queue_size); + DEREGISTER_HOOK_METRIC(heavy_work_pool_queue_size); + DEREGISTER_HOOK_METRIC(light_work_pool_queue_size); + DEREGISTER_HOOK_METRIC(heavy_work_active_threads); + DEREGISTER_HOOK_METRIC(light_work_active_threads); + + DEREGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size); + DEREGISTER_HOOK_METRIC(light_work_pool_max_queue_size); + DEREGISTER_HOOK_METRIC(heavy_work_max_threads); + DEREGISTER_HOOK_METRIC(light_work_max_threads); + CHECK_EQ(0, bthread_key_delete(btls_key)); } @@ -131,7 +165,7 @@ void PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController _transmit_data(cntl_base, new_request, response, new_done, st); } -void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* controller, const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done, @@ -169,23 +203,31 @@ void PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c const PTabletWriterOpenRequest* request, PTabletWriterOpenResult* response, google::protobuf::Closure* done) { - VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << request->index_id() - << ", txn_id=" << request->txn_id(); - brpc::ClosureGuard closure_guard(done); - auto st = _exec_env->load_channel_mgr()->open(*request); - if (!st.ok()) { - LOG(WARNING) << "load channel open failed, message=" << st.get_error_msg() - << ", id=" << request->id() << ", index_id=" << request->index_id() - << ", txn_id=" << request->txn_id(); + bool ret = _light_work_pool.offer([this, request, response, done]() { + VLOG_RPC << "tablet writer open, id=" << request->id() + << ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id(); + brpc::ClosureGuard closure_guard(done); + auto st = _exec_env->load_channel_mgr()->open(*request); + if (!st.ok()) { + LOG(WARNING) << "load channel open failed, message=" << st.get_error_msg() + << ", id=" << request->id() << ", index_id=" << request->index_id() + << ", txn_id=" << request->txn_id(); + } + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - st.to_protobuf(response->mutable_status()); } -void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, PExecPlanFragmentResult* response, google::protobuf::Closure* done) { - auto span = telemetry::start_rpc_server_span("exec_plan_fragment", cntl_base); + auto span = telemetry::start_rpc_server_span("exec_plan_fragment", controller); auto scope = OpentelemetryScope {span}; brpc::ClosureGuard closure_guard(done); auto st = Status::OK(); @@ -199,67 +241,95 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c st.to_protobuf(response->mutable_status()); } -void PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, PExecPlanFragmentResult* response, google::protobuf::Closure* done) { - exec_plan_fragment(cntl_base, request, response, done); + bool ret = _light_work_pool.offer([this, controller, request, response, done]() { + exec_plan_fragment(controller, request, response, done); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcController* controller, const PExecPlanFragmentStartRequest* request, PExecPlanFragmentResult* result, google::protobuf::Closure* done) { - auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", controller); - auto scope = OpentelemetryScope {span}; - brpc::ClosureGuard closure_guard(done); - auto st = _exec_env->fragment_mgr()->start_query_execution(request); - st.to_protobuf(result->mutable_status()); + bool ret = _light_work_pool.offer([this, controller, request, result, done]() { + auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", controller); + auto scope = OpentelemetryScope {span}; + brpc::ClosureGuard closure_guard(done); + auto st = _exec_env->fragment_mgr()->start_query_execution(request); + st.to_protobuf(result->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->mutable_status()->set_status_code(TStatusCode::CANCELLED); + result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } -void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* controller, const PTabletWriterAddBlockRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - // TODO(zxy) delete in 1.2 version - google::protobuf::Closure* new_done = new NewHttpClosure<PTransmitDataParams>(done); - brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); - attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl); + bool ret = _heavy_work_pool.offer([this, controller, request, response, done]() { + // TODO(zxy) delete in 1.2 version + google::protobuf::Closure* new_done = new NewHttpClosure<PTransmitDataParams>(done); + brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); + attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl); - _tablet_writer_add_block(cntl_base, request, response, new_done); + _tablet_writer_add_block(controller, request, response, new_done); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::tablet_writer_add_block_by_http( - google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request, + google::protobuf::RpcController* controller, const ::doris::PEmptyRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - PTabletWriterAddBlockRequest* new_request = new PTabletWriterAddBlockRequest(); - google::protobuf::Closure* new_done = - new NewHttpClosure<PTabletWriterAddBlockRequest>(new_request, done); - brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); - Status st = attachment_extract_request_contain_block<PTabletWriterAddBlockRequest>(new_request, - cntl); - if (st.ok()) { - _tablet_writer_add_block(cntl_base, new_request, response, new_done); - } else { - st.to_protobuf(response->mutable_status()); + bool ret = _heavy_work_pool.offer([this, controller, response, done]() { + PTabletWriterAddBlockRequest* new_request = new PTabletWriterAddBlockRequest(); + google::protobuf::Closure* new_done = + new NewHttpClosure<PTabletWriterAddBlockRequest>(new_request, done); + brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); + Status st = attachment_extract_request_contain_block<PTabletWriterAddBlockRequest>( + new_request, cntl); + if (st.ok()) { + _tablet_writer_add_block(controller, new_request, response, new_done); + } else { + st.to_protobuf(response->mutable_status()); + } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } -void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController* controller, const PTabletWriterAddBlockRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - VLOG_RPC << "tablet writer add block, id=" << request->id() - << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id() - << ", current_queued_size=" << _tablet_worker_pool.get_queue_size(); int64_t submit_task_time_ns = MonotonicNanos(); - _tablet_worker_pool.offer([request, response, done, submit_task_time_ns, this]() { + bool ret = _heavy_work_pool.offer([request, response, done, submit_task_time_ns, this]() { int64_t wait_execution_time_ns = MonotonicNanos() - submit_task_time_ns; brpc::ClosureGuard closure_guard(done); int64_t execution_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); - auto st = _exec_env->load_channel_mgr()->add_batch(*request, response); if (!st.ok()) { LOG(WARNING) << "tablet writer add block failed, message=" << st.get_error_msg() @@ -272,6 +342,12 @@ void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO); response->set_wait_execution_time_us(wait_execution_time_ns / NANOS_PER_MICRO); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcController* cntl_base, @@ -303,13 +379,12 @@ void PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcControl PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) { VLOG_RPC << "tablet writer add batch, id=" << request->id() - << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id() - << ", current_queued_size=" << _tablet_worker_pool.get_queue_size(); + << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id(); // add batch maybe cost a lot of time, and this callback thread will be held. // this will influence query execution, because the pthreads under bthread may be // exhausted, so we put this to a local thread pool to process int64_t submit_task_time_ns = MonotonicNanos(); - _tablet_worker_pool.offer([cntl_base, request, response, done, submit_task_time_ns, this]() { + bool ret = _heavy_work_pool.offer([cntl_base, request, response, done, submit_task_time_ns, this]() { int64_t wait_execution_time_ns = MonotonicNanos() - submit_task_time_ns; brpc::ClosureGuard closure_guard(done); int64_t execution_time_ns = 0; @@ -332,20 +407,32 @@ void PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcControl response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO); response->set_wait_execution_time_us(wait_execution_time_ns / NANOS_PER_MICRO); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* controller, const PTabletWriterCancelRequest* request, PTabletWriterCancelResult* response, google::protobuf::Closure* done) { - VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" << request->index_id() - << ", sender_id=" << request->sender_id(); - brpc::ClosureGuard closure_guard(done); - auto st = _exec_env->load_channel_mgr()->cancel(*request); - if (!st.ok()) { - LOG(WARNING) << "tablet writer cancel failed, id=" << request->id() - << ", index_id=" << request->index_id() - << ", sender_id=" << request->sender_id(); + bool ret = _light_work_pool.offer([this, request, done]() { + VLOG_RPC << "tablet writer cancel, id=" << request->id() + << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id(); + brpc::ClosureGuard closure_guard(done); + auto st = _exec_env->load_channel_mgr()->cancel(*request); + if (!st.ok()) { + LOG(WARNING) << "tablet writer cancel failed, id=" << request->id() + << ", index_id=" << request->index_id() + << ", sender_id=" << request->sender_id(); + } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); } } @@ -377,309 +464,405 @@ Status PInternalServiceImpl::_exec_plan_fragment(const std::string& ser_request, } } -void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* controller, const PCancelPlanFragmentRequest* request, PCancelPlanFragmentResult* result, google::protobuf::Closure* done) { - auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", cntl_base); - auto scope = OpentelemetryScope {span}; - brpc::ClosureGuard closure_guard(done); - TUniqueId tid; - tid.__set_hi(request->finst_id().hi()); - tid.__set_lo(request->finst_id().lo()); - - Status st; - if (request->has_cancel_reason()) { - LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid) - << ", reason: " << request->cancel_reason(); - st = _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason()); - } else { - LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid); - st = _exec_env->fragment_mgr()->cancel(tid); - } - if (!st.ok()) { - LOG(WARNING) << "cancel plan fragment failed, errmsg=" << st.get_error_msg(); + bool ret = _light_work_pool.offer([this, controller, request, result, done]() { + auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", controller); + auto scope = OpentelemetryScope {span}; + brpc::ClosureGuard closure_guard(done); + TUniqueId tid; + tid.__set_hi(request->finst_id().hi()); + tid.__set_lo(request->finst_id().lo()); + + Status st = Status::OK(); + if (request->has_cancel_reason()) { + LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid) + << ", reason: " << request->cancel_reason(); + _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason()); + } else { + LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid); + _exec_env->fragment_mgr()->cancel(tid); + } + // TODO: the logic seems useless, cancel only return Status::OK. remove it + st.to_protobuf(result->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->mutable_status()->set_status_code(TStatusCode::CANCELLED); + result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - st.to_protobuf(result->mutable_status()); } -void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* controller, const PFetchDataRequest* request, PFetchDataResult* result, google::protobuf::Closure* done) { - brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); - GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done); - _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); + bool ret = _heavy_work_pool.offer([this, controller, request, result, done]() { + brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); + GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done); + _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->mutable_status()->set_status_code(TStatusCode::CANCELLED); + result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* controller, const PFetchTableSchemaRequest* request, PFetchTableSchemaResult* result, google::protobuf::Closure* done) { - VLOG_RPC << "fetch table schema"; - brpc::ClosureGuard closure_guard(done); - TFileScanRange file_scan_range; - Status st = Status::OK(); - { - const uint8_t* buf = (const uint8_t*)(request->file_scan_range().data()); - uint32_t len = request->file_scan_range().size(); - st = deserialize_thrift_msg(buf, &len, false, &file_scan_range); + bool ret = _heavy_work_pool.offer([request, result, done]() { + VLOG_RPC << "fetch table schema"; + brpc::ClosureGuard closure_guard(done); + TFileScanRange file_scan_range; + Status st = Status::OK(); + { + const uint8_t* buf = (const uint8_t*)(request->file_scan_range().data()); + uint32_t len = request->file_scan_range().size(); + st = deserialize_thrift_msg(buf, &len, false, &file_scan_range); + if (!st.ok()) { + LOG(WARNING) << "fetch table schema failed, errmsg=" << st.get_error_msg(); + st.to_protobuf(result->mutable_status()); + return; + } + } + if (file_scan_range.__isset.ranges == false) { + st = Status::InternalError("can not get TFileRangeDesc."); + st.to_protobuf(result->mutable_status()); + return; + } + if (file_scan_range.__isset.params == false) { + st = Status::InternalError("can not get TFileScanRangeParams."); + st.to_protobuf(result->mutable_status()); + return; + } + const TFileRangeDesc& range = file_scan_range.ranges.at(0); + const TFileScanRangeParams& params = file_scan_range.params; + + std::unique_ptr<vectorized::GenericReader> reader(nullptr); + std::unique_ptr<RuntimeProfile> profile(new RuntimeProfile("FetchTableSchema")); + switch (params.format_type) { + case TFileFormatType::FORMAT_CSV_PLAIN: + case TFileFormatType::FORMAT_CSV_GZ: + case TFileFormatType::FORMAT_CSV_BZ2: + case TFileFormatType::FORMAT_CSV_LZ4FRAME: + case TFileFormatType::FORMAT_CSV_LZOP: + case TFileFormatType::FORMAT_CSV_DEFLATE: { + // file_slots is no use + std::vector<SlotDescriptor*> file_slots; + reader.reset( + new vectorized::CsvReader(profile.get(), params, range, file_slots)); + break; + } + case TFileFormatType::FORMAT_PARQUET: { + reader.reset(new vectorized::ParquetReader(params, range)); + break; + } + case TFileFormatType::FORMAT_ORC: { + std::vector<std::string> column_names; + reader.reset(new vectorized::OrcReader(params, range, column_names, "")); + break; + } + case TFileFormatType::FORMAT_JSON: { + std::vector<SlotDescriptor*> file_slots; + reader.reset(new vectorized::NewJsonReader(profile.get(), params, range, file_slots)); + break; + } + default: + st = Status::InternalError("Not supported file format in fetch table schema: {}", + params.format_type); + st.to_protobuf(result->mutable_status()); + return; + } + std::vector<std::string> col_names; + std::vector<TypeDescriptor> col_types; + st = reader->get_parsed_schema(&col_names, &col_types); if (!st.ok()) { LOG(WARNING) << "fetch table schema failed, errmsg=" << st.get_error_msg(); st.to_protobuf(result->mutable_status()); return; } - } - if (file_scan_range.__isset.ranges == false) { - st = Status::InternalError("can not get TFileRangeDesc."); - st.to_protobuf(result->mutable_status()); - return; - } - if (file_scan_range.__isset.params == false) { - st = Status::InternalError("can not get TFileScanRangeParams."); - st.to_protobuf(result->mutable_status()); - return; - } - const TFileRangeDesc& range = file_scan_range.ranges.at(0); - const TFileScanRangeParams& params = file_scan_range.params; - - std::unique_ptr<vectorized::GenericReader> reader(nullptr); - std::unique_ptr<RuntimeProfile> profile(new RuntimeProfile("FetchTableSchema")); - switch (params.format_type) { - case TFileFormatType::FORMAT_CSV_PLAIN: - case TFileFormatType::FORMAT_CSV_GZ: - case TFileFormatType::FORMAT_CSV_BZ2: - case TFileFormatType::FORMAT_CSV_LZ4FRAME: - case TFileFormatType::FORMAT_CSV_LZOP: - case TFileFormatType::FORMAT_CSV_DEFLATE: { - // file_slots is no use - std::vector<SlotDescriptor*> file_slots; - reader.reset(new vectorized::CsvReader(profile.get(), params, range, file_slots)); - break; - } - case TFileFormatType::FORMAT_PARQUET: { - reader.reset(new vectorized::ParquetReader(params, range)); - break; - } - case TFileFormatType::FORMAT_ORC: { - std::vector<std::string> column_names; - reader.reset(new vectorized::OrcReader(params, range, column_names, "")); - break; - } - case TFileFormatType::FORMAT_JSON: { - std::vector<SlotDescriptor*> file_slots; - reader.reset(new vectorized::NewJsonReader(profile.get(), params, range, file_slots)); - break; - } - default: - st = Status::InternalError("Not supported file format in fetch table schema: {}", - params.format_type); - st.to_protobuf(result->mutable_status()); - return; - } - std::vector<std::string> col_names; - std::vector<TypeDescriptor> col_types; - st = reader->get_parsed_schema(&col_names, &col_types); - if (!st.ok()) { - LOG(WARNING) << "fetch table schema failed, errmsg=" << st.get_error_msg(); + result->set_column_nums(col_names.size()); + for (size_t idx = 0; idx < col_names.size(); ++idx) { + result->add_column_names(col_names[idx]); + } + for (size_t idx = 0; idx < col_types.size(); ++idx) { + PTypeDesc* type_desc = result->add_column_types(); + col_types[idx].to_protobuf(type_desc); + } st.to_protobuf(result->mutable_status()); - return; - } - result->set_column_nums(col_names.size()); - for (size_t idx = 0; idx < col_names.size(); ++idx) { - result->add_column_names(col_names[idx]); - } - for (size_t idx = 0; idx < col_types.size(); ++idx) { - PTypeDesc* type_desc = result->add_column_types(); - col_types[idx].to_protobuf(type_desc); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->mutable_status()->set_status_code(TStatusCode::CANCELLED); + result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - st.to_protobuf(result->mutable_status()); } void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, const PProxyRequest* request, PProxyResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - // PProxyRequest is defined in gensrc/proto/internal_service.proto - // Currently it supports 2 kinds of requests: - // 1. get all kafka partition ids for given topic - // 2. get all kafka partition offsets for given topic and timestamp. - if (request->has_kafka_meta_request()) { - const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request(); - if (!kafka_request.partition_id_for_latest_offsets().empty()) { - // get latest offsets for specified partition ids - std::vector<PIntegerPair> partition_offsets; - Status st = _exec_env->routine_load_task_executor() - ->get_kafka_latest_offsets_for_partitions( - request->kafka_meta_request(), &partition_offsets); - if (st.ok()) { - PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); - for (const auto& entry : partition_offsets) { - PIntegerPair* res = part_offsets->add_offset_times(); - res->set_key(entry.key()); - res->set_val(entry.val()); + bool ret = _heavy_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + // PProxyRequest is defined in gensrc/proto/internal_service.proto + // Currently it supports 2 kinds of requests: + // 1. get all kafka partition ids for given topic + // 2. get all kafka partition offsets for given topic and timestamp. + if (request->has_kafka_meta_request()) { + const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request(); + if (!kafka_request.partition_id_for_latest_offsets().empty()) { + // get latest offsets for specified partition ids + std::vector<PIntegerPair> partition_offsets; + Status st = _exec_env->routine_load_task_executor() + ->get_kafka_latest_offsets_for_partitions( + request->kafka_meta_request(), &partition_offsets); + if (st.ok()) { + PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); + for (const auto& entry : partition_offsets) { + PIntegerPair* res = part_offsets->add_offset_times(); + res->set_key(entry.key()); + res->set_val(entry.val()); + } } - } - st.to_protobuf(response->mutable_status()); - return; - } else if (!kafka_request.offset_times().empty()) { - // if offset_times() has elements, which means this request is to get offset by timestamp. - std::vector<PIntegerPair> partition_offsets; - Status st = - _exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times( - request->kafka_meta_request(), &partition_offsets); - if (st.ok()) { - PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); - for (const auto& entry : partition_offsets) { - PIntegerPair* res = part_offsets->add_offset_times(); - res->set_key(entry.key()); - res->set_val(entry.val()); + st.to_protobuf(response->mutable_status()); + return; + } else if (!kafka_request.offset_times().empty()) { + // if offset_times() has elements, which means this request is to get offset by timestamp. + std::vector<PIntegerPair> partition_offsets; + Status st = _exec_env->routine_load_task_executor() + ->get_kafka_partition_offsets_for_times( + request->kafka_meta_request(), &partition_offsets); + if (st.ok()) { + PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); + for (const auto& entry : partition_offsets) { + PIntegerPair* res = part_offsets->add_offset_times(); + res->set_key(entry.key()); + res->set_val(entry.val()); + } } - } - st.to_protobuf(response->mutable_status()); - return; - } else { - // get partition ids of topic - std::vector<int32_t> partition_ids; - Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_meta( - request->kafka_meta_request(), &partition_ids); - if (st.ok()) { - PKafkaMetaProxyResult* kafka_result = response->mutable_kafka_meta_result(); - for (int32_t id : partition_ids) { - kafka_result->add_partition_ids(id); + st.to_protobuf(response->mutable_status()); + return; + } else { + // get partition ids of topic + std::vector<int32_t> partition_ids; + Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_meta( + request->kafka_meta_request(), &partition_ids); + if (st.ok()) { + PKafkaMetaProxyResult* kafka_result = response->mutable_kafka_meta_result(); + for (int32_t id : partition_ids) { + kafka_result->add_partition_ids(id); + } } + st.to_protobuf(response->mutable_status()); + return; } - st.to_protobuf(response->mutable_status()); - return; } + Status::OK().to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - Status::OK().to_protobuf(response->mutable_status()); } void PInternalServiceImpl::update_cache(google::protobuf::RpcController* controller, const PUpdateCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - _exec_env->result_cache()->update(request, response); + bool ret = _light_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->update(request, response); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->set_status(PCacheStatus::CANCELED); + } } void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controller, const PFetchCacheRequest* request, PFetchCacheResult* result, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - _exec_env->result_cache()->fetch(request, result); + bool ret = _heavy_work_pool.offer([this, request, result, done]() { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->fetch(request, result); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->set_status(PCacheStatus::CANCELED); + } } void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* controller, const PClearCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - _exec_env->result_cache()->clear(request, response); + bool ret = _light_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->clear(request, response); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->set_status(PCacheStatus::CANCELED); + } } void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* controller, const ::doris::PMergeFilterRequest* request, ::doris::PMergeFilterResponse* response, ::google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment(); - butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); - Status st = _exec_env->fragment_mgr()->merge_filter(request, &zero_copy_input_stream); - if (!st.ok()) { - LOG(WARNING) << "merge meet error" << st.to_string(); + bool ret = _light_work_pool.offer([this, controller, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment(); + butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); + Status st = _exec_env->fragment_mgr()->merge_filter(request, &zero_copy_input_stream); + if (!st.ok()) { + LOG(WARNING) << "merge meet error" << st.to_string(); + } + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - st.to_protobuf(response->mutable_status()); } void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* controller, const ::doris::PPublishFilterRequest* request, ::doris::PPublishFilterResponse* response, ::google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment(); - butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); - UniqueId unique_id(request->query_id()); - VLOG_NOTICE << "rpc apply_filter recv"; - Status st = _exec_env->fragment_mgr()->apply_filter(request, &zero_copy_input_stream); - if (!st.ok()) { - LOG(WARNING) << "apply filter meet error: " << st.to_string(); + bool ret = _light_work_pool.offer([this, controller, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment(); + butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); + UniqueId unique_id(request->query_id()); + VLOG_NOTICE << "rpc apply_filter recv"; + Status st = _exec_env->fragment_mgr()->apply_filter(request, &zero_copy_input_stream); + if (!st.ok()) { + LOG(WARNING) << "apply filter meet error: " << st.to_string(); + } + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - st.to_protobuf(response->mutable_status()); } void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller, const PSendDataRequest* request, PSendDataResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - TUniqueId fragment_instance_id; - fragment_instance_id.hi = request->fragment_instance_id().hi(); - fragment_instance_id.lo = request->fragment_instance_id().lo(); - auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); - if (pipe == nullptr) { - response->mutable_status()->set_status_code(1); - response->mutable_status()->add_error_msgs("pipe is null"); - } else { - for (int i = 0; i < request->data_size(); ++i) { - PDataRow* row = new PDataRow(); - row->CopyFrom(request->data(i)); - pipe->append_and_flush(reinterpret_cast<char*>(&row), sizeof(row), - sizeof(row) + row->ByteSizeLong()); + bool ret = _heavy_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + TUniqueId fragment_instance_id; + fragment_instance_id.hi = request->fragment_instance_id().hi(); + fragment_instance_id.lo = request->fragment_instance_id().lo(); + + auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); + if (pipe == nullptr) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs("pipe is null"); + } else { + for (int i = 0; i < request->data_size(); ++i) { + PDataRow* row = new PDataRow(); + row->CopyFrom(request->data(i)); + pipe->append_and_flush(reinterpret_cast<char*>(&row), sizeof(row), + sizeof(row) + row->ByteSizeLong()); + } + response->mutable_status()->set_status_code(0); } - response->mutable_status()->set_status_code(0); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } void PInternalServiceImpl::commit(google::protobuf::RpcController* controller, const PCommitRequest* request, PCommitResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - TUniqueId fragment_instance_id; - fragment_instance_id.hi = request->fragment_instance_id().hi(); - fragment_instance_id.lo = request->fragment_instance_id().lo(); - auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); - if (pipe == nullptr) { - response->mutable_status()->set_status_code(1); - response->mutable_status()->add_error_msgs("pipe is null"); - } else { - pipe->finish(); - response->mutable_status()->set_status_code(0); + bool ret = _light_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + TUniqueId fragment_instance_id; + fragment_instance_id.hi = request->fragment_instance_id().hi(); + fragment_instance_id.lo = request->fragment_instance_id().lo(); + + auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); + if (pipe == nullptr) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs("pipe is null"); + } else { + pipe->finish(); + response->mutable_status()->set_status_code(0); + } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } void PInternalServiceImpl::rollback(google::protobuf::RpcController* controller, const PRollbackRequest* request, PRollbackResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - TUniqueId fragment_instance_id; - fragment_instance_id.hi = request->fragment_instance_id().hi(); - fragment_instance_id.lo = request->fragment_instance_id().lo(); - auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); - if (pipe == nullptr) { - response->mutable_status()->set_status_code(1); - response->mutable_status()->add_error_msgs("pipe is null"); - } else { - pipe->cancel("rollback"); - response->mutable_status()->set_status_code(0); + bool ret = _light_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + TUniqueId fragment_instance_id; + fragment_instance_id.hi = request->fragment_instance_id().hi(); + fragment_instance_id.lo = request->fragment_instance_id().lo(); + + auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); + if (pipe == nullptr) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs("pipe is null"); + } else { + pipe->cancel("rollback"); + response->mutable_status()->set_status_code(0); + } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } -void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* controller, const PConstantExprRequest* request, PConstantExprResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); - - Status st = Status::OK(); - if (request->has_request()) { + bool ret = _light_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + Status st = Status::OK(); st = _fold_constant_expr(request->request(), response); - } else { - // TODO(yangzhengguo) this is just for compatible with old version, this should be removed in the release 0.15 - st = _fold_constant_expr(cntl->request_attachment().to_string(), response); - } - if (!st.ok()) { - LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st.get_error_msg(); + if (!st.ok()) { + LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st.get_error_msg(); + } + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - st.to_protobuf(response->mutable_status()); } Status PInternalServiceImpl::_fold_constant_expr(const std::string& ser_request, @@ -696,31 +879,48 @@ Status PInternalServiceImpl::_fold_constant_expr(const std::string& ser_request, return FoldConstantExecutor().fold_constant_vexpr(t_request, response); } -void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* controller, const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - // TODO(zxy) delete in 1.2 version - google::protobuf::Closure* new_done = new NewHttpClosure<PTransmitDataParams>(done); - brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); - attachment_transfer_request_block<PTransmitDataParams>(request, cntl); + bool ret = _heavy_work_pool.offer([this, controller, request, response, done]() { + // TODO(zxy) delete in 1.2 version + google::protobuf::Closure* new_done = new NewHttpClosure<PTransmitDataParams>(done); + brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); + attachment_transfer_request_block<PTransmitDataParams>(request, cntl); - _transmit_block(cntl_base, request, response, new_done, Status::OK()); + _transmit_block(controller, request, response, new_done, Status::OK()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } -void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController* controller, const PEmptyRequest* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - PTransmitDataParams* new_request = new PTransmitDataParams(); - google::protobuf::Closure* new_done = - new NewHttpClosure<PTransmitDataParams>(new_request, done); - brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); - Status st = attachment_extract_request_contain_block<PTransmitDataParams>(new_request, cntl); - _transmit_block(cntl_base, new_request, response, new_done, st); + bool ret = _heavy_work_pool.offer([this, controller, response, done]() { + PTransmitDataParams* new_request = new PTransmitDataParams(); + google::protobuf::Closure* new_done = + new NewHttpClosure<PTransmitDataParams>(new_request, done); + brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); + Status st = + attachment_extract_request_contain_block<PTransmitDataParams>(new_request, cntl); + _transmit_block(controller, new_request, response, new_done, st); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } -void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* controller, const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done, @@ -758,25 +958,34 @@ void PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* co const PCheckRPCChannelRequest* request, PCheckRPCChannelResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(0); - if (request->data().size() != request->size()) { - std::stringstream ss; - ss << "data size not same, expected: " << request->size() - << ", actual: " << request->data().size(); - response->mutable_status()->add_error_msgs(ss.str()); - response->mutable_status()->set_status_code(1); - - } else { - Md5Digest digest; - digest.update(static_cast<const void*>(request->data().c_str()), request->data().size()); - digest.digest(); - if (!iequal(digest.hex(), request->md5())) { + bool ret = _light_work_pool.offer([request, response, done]() { + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(0); + if (request->data().size() != request->size()) { std::stringstream ss; - ss << "md5 not same, expected: " << request->md5() << ", actual: " << digest.hex(); + ss << "data size not same, expected: " << request->size() + << ", actual: " << request->data().size(); response->mutable_status()->add_error_msgs(ss.str()); response->mutable_status()->set_status_code(1); + + } else { + Md5Digest digest; + digest.update(static_cast<const void*>(request->data().c_str()), + request->data().size()); + digest.digest(); + if (!iequal(digest.hex(), request->md5())) { + std::stringstream ss; + ss << "md5 not same, expected: " << request->md5() << ", actual: " << digest.hex(); + response->mutable_status()->add_error_msgs(ss.str()); + response->mutable_status()->set_status_code(1); + } } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } @@ -784,44 +993,60 @@ void PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* co const PResetRPCChannelRequest* request, PResetRPCChannelResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(0); - if (request->all()) { - int size = ExecEnv::GetInstance()->brpc_internal_client_cache()->size(); - if (size > 0) { - std::vector<std::string> endpoints; - ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints); - ExecEnv::GetInstance()->brpc_internal_client_cache()->clear(); - *response->mutable_channels() = {endpoints.begin(), endpoints.end()}; - } - } else { - for (const std::string& endpoint : request->endpoints()) { - if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) { - response->mutable_status()->add_error_msgs(endpoint + ": not found."); - continue; + bool ret = _light_work_pool.offer([request, response, done]() { + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(0); + if (request->all()) { + int size = ExecEnv::GetInstance()->brpc_internal_client_cache()->size(); + if (size > 0) { + std::vector<std::string> endpoints; + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints); + ExecEnv::GetInstance()->brpc_internal_client_cache()->clear(); + *response->mutable_channels() = {endpoints.begin(), endpoints.end()}; } + } else { + for (const std::string& endpoint : request->endpoints()) { + if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) { + response->mutable_status()->add_error_msgs(endpoint + ": not found."); + continue; + } - if (ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) { - response->add_channels(endpoint); - } else { - response->mutable_status()->add_error_msgs(endpoint + ": reset failed."); + if (ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) { + response->add_channels(endpoint); + } else { + response->mutable_status()->add_error_msgs(endpoint + ": reset failed."); + } + } + if (request->endpoints_size() != response->channels_size()) { + response->mutable_status()->set_status_code(1); } } - if (request->endpoints_size() != response->channels_size()) { - response->mutable_status()->set_status_code(1); - } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } -void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* controller, const PHandShakeRequest* request, PHandShakeResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - if (request->has_hello()) { - response->set_hello(request->hello()); + bool ret = _light_work_pool.offer([request, response, done]() { + brpc::ClosureGuard closure_guard(done); + if (request->has_hello()) { + response->set_hello(request->hello()); + } + response->mutable_status()->set_status_code(0); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - response->mutable_status()->set_status_code(0); } void PInternalServiceImpl::request_slave_tablet_pull_rowset( @@ -836,7 +1061,8 @@ void PInternalServiceImpl::request_slave_tablet_pull_rowset( int64_t brpc_port = request->brpc_port(); std::string token = request->token(); int64_t node_id = request->node_id(); - _slave_replica_worker_pool.offer([=]() { + bool ret = _heavy_work_pool.offer([rowset_meta_pb, host, brpc_port, node_id, segments_size, + http_port, token, rowset_path, this]() { TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet( rowset_meta_pb.tablet_id(), rowset_meta_pb.tablet_schema_hash()); if (tablet == nullptr) { @@ -979,6 +1205,12 @@ void PInternalServiceImpl::request_slave_tablet_pull_rowset( _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(), rowset_meta->tablet_id(), node_id, true); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } Status::OK().to_protobuf(response->mutable_status()); } @@ -1037,14 +1269,22 @@ void PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote void PInternalServiceImpl::response_slave_tablet_pull_rowset( google::protobuf::RpcController* controller, const PTabletWriteSlaveDoneRequest* request, PTabletWriteSlaveDoneResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - VLOG_CRITICAL - << "receive the result of slave replica pull rowset from slave replica. slave server=" - << request->node_id() << ", is_succeed=" << request->is_succeed() - << ", tablet_id=" << request->tablet_id() << ", txn_id=" << request->txn_id(); - StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset( - request->txn_id(), request->tablet_id(), request->node_id(), request->is_succeed()); - Status::OK().to_protobuf(response->mutable_status()); + bool ret = _heavy_work_pool.offer([request, response, done]() { + brpc::ClosureGuard closure_guard(done); + VLOG_CRITICAL << "receive the result of slave replica pull rowset from slave replica. " + "slave server=" + << request->node_id() << ", is_succeed=" << request->is_succeed() + << ", tablet_id=" << request->tablet_id() << ", txn_id=" << request->txn_id(); + StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset( + request->txn_id(), request->tablet_id(), request->node_id(), request->is_succeed()); + Status::OK().to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } } // namespace doris diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 3ea3655974..e5855d98f3 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -194,8 +194,13 @@ private: private: ExecEnv* _exec_env; - PriorityThreadPool _tablet_worker_pool; - PriorityThreadPool _slave_replica_worker_pool; + + // every brpc service request should put into thread pool + // the reason see issue #16634 + // define the interface for reading and writing data as heavy interface + // otherwise as light interface + PriorityThreadPool _heavy_work_pool; + PriorityThreadPool _light_work_pool; }; } // namespace doris diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 300c35c9ec..d5bd97fbe2 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -212,6 +212,16 @@ public: IntCounter* upload_rowset_count; IntCounter* upload_fail_count; + UIntGauge* light_work_pool_queue_size; + UIntGauge* heavy_work_pool_queue_size; + UIntGauge* heavy_work_active_threads; + UIntGauge* light_work_active_threads; + + UIntGauge* heavy_work_pool_max_queue_size; + UIntGauge* light_work_pool_max_queue_size; + UIntGauge* heavy_work_max_threads; + UIntGauge* light_work_max_threads; + static DorisMetrics* instance() { static DorisMetrics instance; return &instance; diff --git a/be/src/util/priority_thread_pool.hpp b/be/src/util/priority_thread_pool.hpp index 32fc637970..077b9757c8 100644 --- a/be/src/util/priority_thread_pool.hpp +++ b/be/src/util/priority_thread_pool.hpp @@ -54,7 +54,7 @@ public: // queue exceeds this size, subsequent calls to Offer will block until there is // capacity available. PriorityThreadPool(uint32_t num_threads, uint32_t queue_size, const std::string& name) - : _work_queue(queue_size), _shutdown(false), _name(name) { + : _work_queue(queue_size), _shutdown(false), _name(name), _active_threads(0) { for (int i = 0; i < num_threads; ++i) { _threads.create_thread( std::bind<void>(std::mem_fn(&PriorityThreadPool::work_thread), this, i)); @@ -100,6 +100,7 @@ public: virtual void join() { _threads.join_all(); } virtual uint32_t get_queue_size() const { return _work_queue.get_size(); } + virtual uint32_t get_active_threads() const { return _active_threads; } // Blocks until the work queue is empty, and then calls shutdown to stop the worker // threads and Join to wait until they are finished. @@ -135,7 +136,9 @@ private: while (!is_shutdown()) { Task task; if (_work_queue.blocking_get(&task)) { + _active_threads++; task.work_function(); + _active_threads--; } if (_work_queue.get_size() == 0) { _empty_cv.notify_all(); @@ -150,6 +153,7 @@ private: // Set to true when threads should stop doing work and terminate. std::atomic<bool> _shutdown; std::string _name; + std::atomic<int> _active_threads; }; } // namespace doris diff --git a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md index a55cff98f7..791a66e8c7 100644 --- a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md +++ b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md @@ -293,6 +293,14 @@ curl http://be_host:webserver_port/metrics?type=json |`doris_be_load_bytes`| | 字节|通过 tablet sink 发送的数量累计 | 可观测导入数据量 | P0 | |`doris_be_load_rows`| | Num | 通过 tablet sink 发送的行数累计| 可观测导入数据量 | P0 | |`fragment_thread_pool_queue_size`| | Num | 当前查询执行线程池等待队列的长度 | 如果大于零,则说明查询线程已耗尽,查询会出现堆积 | P0 | +|`doris_be_all_rowset_nums`| | Num | 当前所有 rowset 的个数 | | P0 | +|`doris_be_all_segment_nums`| | Num | 当前所有 segment 的个数 | | P0 | +|`doris_be_heavy_work_max_threads`| | Num | brpc heavy线程池线程个数| | p0 | +|`doris_be_light_work_max_threads`| | Num | brpc light线程池线程个数| | p0 | +|`doris_be_heavy_work_pool_queue_size`| | Num | brpc heavy线程池队列最大长度,超过则阻塞提交work| | p0 | +|`doris_be_light_work_pool_queue_size`| | Num | brpc light线程池队列最大长度,超过则阻塞提交work| | p0 | +|`doris_be_heavy_work_active_threads`| | Num | brpc heavy线程池活跃线程数| | p0 | +|`doris_be_light_work_active_threads`| | Num | brpc light线程池活跃线程数| | p0 | ### 机器监控 diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 0396e8944b..be17516c64 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -243,6 +243,7 @@ enum PCacheStatus { INVALID_KEY_RANGE = 6; DATA_OVERDUE = 7; EMPTY_DATA = 8; + CANCELED = 9; }; enum CacheType { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
