This is an automated email from the ASF dual-hosted git repository.
linzhongcheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0e3be4eff5 [Improvement](brpc) Using a thread pool for RPC service
avoiding std::mutex block brpc::bthread (#16639)
0e3be4eff5 is described below
commit 0e3be4eff59b8e4e3d21791a8bb7e8cb496d2f2f
Author: chenlinzhong <[email protected]>
AuthorDate: Wed Feb 22 14:15:47 2023 +0800
[Improvement](brpc) Using a thread pool for RPC service avoiding std::mutex
block brpc::bthread (#16639)
mainly include:
- brpc service adds two types of thread pools. The number of "light" and
"heavy" thread pools is different
Classify the interfaces of be. Those related to data transmission are
classified as heavy interfaces and others as light interfaces
- Add some monitoring to the thread pool, including the queue size and the
number of active threads. Use these
- indicators to guide the configuration of the number of threads
---
be/src/common/config.h | 14 +-
be/src/service/internal_service.cpp | 953 +++++++++++++--------
be/src/service/internal_service.h | 9 +-
be/src/util/doris_metrics.h | 10 +
be/src/util/priority_thread_pool.hpp | 6 +-
.../maint-monitor/monitor-metrics/metrics.md | 6 +
gensrc/proto/internal_service.proto | 1 +
7 files changed, 637 insertions(+), 362 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 8516ea4fa9..f15d30bc7d 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -35,7 +35,8 @@ CONF_Int32(be_port, "9060");
// port for brpc
CONF_Int32(brpc_port, "8060");
-// the number of bthreads for brpc, the default value is set to -1, which
means the number of bthreads is #cpu-cores
+// the number of bthreads for brpc, the default value is set to -1,
+// which means the number of bthreads is #cpu-cores
CONF_Int32(brpc_num_threads, "-1");
// port to brpc server for single replica load
@@ -388,8 +389,15 @@ CONF_Int32(single_replica_load_download_num_workers, "64");
CONF_Int64(load_data_reserve_hours, "4");
// log error log will be removed after this time
CONF_mInt64(load_error_log_reserve_hours, "48");
-CONF_Int32(number_tablet_writer_threads, "16");
-CONF_Int32(number_slave_replica_download_threads, "64");
+
+// be brpc interface is classified into two categories: light and heavy
+// each category has diffrent thread number
+// threads to handle heavy api interface, such as transmit_data/transmit_block
etc
+CONF_Int32(brpc_heavy_work_pool_threads, "192");
+// threads to handle light api interface, such as
exec_plan_fragment_prepare/exec_plan_fragment_start
+CONF_Int32(brpc_light_work_pool_threads, "32");
+CONF_Int32(brpc_heavy_work_pool_max_queue_size, "10240");
+CONF_Int32(brpc_light_work_pool_max_queue_size, "10240");
// The maximum amount of data that can be processed by a stream load
CONF_mInt64(streaming_load_max_mb, "10240");
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index a8ed5ecfd8..3b271e4152 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -70,7 +70,15 @@ using namespace ErrorCode;
const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3;
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(add_batch_task_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_active_threads,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_active_threads,
MetricUnit::NOUNIT);
+
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_max_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_max_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_max_threads, MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_max_threads, MetricUnit::NOUNIT);
bthread_key_t btls_key;
@@ -104,32 +112,58 @@ private:
PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env)
: _exec_env(exec_env),
- _tablet_worker_pool(config::number_tablet_writer_threads, 10240,
"tablet_writer"),
-
_slave_replica_worker_pool(config::number_slave_replica_download_threads, 10240,
- "replica_download") {
- REGISTER_HOOK_METRIC(add_batch_task_queue_size,
- [this]() { return
_tablet_worker_pool.get_queue_size(); });
+ _heavy_work_pool(config::brpc_heavy_work_pool_threads,
+ config::brpc_heavy_work_pool_max_queue_size,
"brpc_heavy"),
+ _light_work_pool(config::brpc_light_work_pool_threads,
+ config::brpc_light_work_pool_max_queue_size,
"brpc_light") {
+ REGISTER_HOOK_METRIC(heavy_work_pool_queue_size,
+ [this]() { return _heavy_work_pool.get_queue_size();
});
+ REGISTER_HOOK_METRIC(light_work_pool_queue_size,
+ [this]() { return _light_work_pool.get_queue_size();
});
+ REGISTER_HOOK_METRIC(heavy_work_active_threads,
+ [this]() { return
_heavy_work_pool.get_active_threads(); });
+ REGISTER_HOOK_METRIC(light_work_active_threads,
+ [this]() { return
_light_work_pool.get_active_threads(); });
+
+ REGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size,
+ []() { return
config::brpc_heavy_work_pool_max_queue_size; });
+ REGISTER_HOOK_METRIC(light_work_pool_max_queue_size,
+ []() { return
config::brpc_light_work_pool_max_queue_size; });
+ REGISTER_HOOK_METRIC(heavy_work_max_threads,
+ []() { return config::brpc_heavy_work_pool_threads;
});
+ REGISTER_HOOK_METRIC(light_work_max_threads,
+ []() { return config::brpc_light_work_pool_threads;
});
+
CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter));
CHECK_EQ(0, bthread_key_create(&AsyncIO::btls_io_ctx_key,
AsyncIO::io_ctx_key_deleter));
}
PInternalServiceImpl::~PInternalServiceImpl() {
- DEREGISTER_HOOK_METRIC(add_batch_task_queue_size);
+ DEREGISTER_HOOK_METRIC(heavy_work_pool_queue_size);
+ DEREGISTER_HOOK_METRIC(light_work_pool_queue_size);
+ DEREGISTER_HOOK_METRIC(heavy_work_active_threads);
+ DEREGISTER_HOOK_METRIC(light_work_active_threads);
+
+ DEREGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size);
+ DEREGISTER_HOOK_METRIC(light_work_pool_max_queue_size);
+ DEREGISTER_HOOK_METRIC(heavy_work_max_threads);
+ DEREGISTER_HOOK_METRIC(light_work_max_threads);
+
CHECK_EQ(0, bthread_key_delete(btls_key));
CHECK_EQ(0, bthread_key_delete(AsyncIO::btls_io_ctx_key));
}
-void PInternalServiceImpl::transmit_data(google::protobuf::RpcController*
cntl_base,
+void PInternalServiceImpl::transmit_data(google::protobuf::RpcController*
controller,
const PTransmitDataParams* request,
PTransmitDataResult* response,
google::protobuf::Closure* done) {}
-void
PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController*
cntl_base,
+void
PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController*
controller,
const PEmptyRequest* request,
PTransmitDataResult* response,
google::protobuf::Closure*
done) {}
-void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController*
cntl_base,
+void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController*
controller,
const PTransmitDataParams* request,
PTransmitDataResult* response,
google::protobuf::Closure* done,
@@ -139,22 +173,31 @@ void
PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c
const PTabletWriterOpenRequest*
request,
PTabletWriterOpenResult*
response,
google::protobuf::Closure* done)
{
- VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" <<
request->index_id()
- << ", txn_id=" << request->txn_id();
- brpc::ClosureGuard closure_guard(done);
- auto st = _exec_env->load_channel_mgr()->open(*request);
- if (!st.ok()) {
- LOG(WARNING) << "load channel open failed, message=" << st << ", id="
<< request->id()
- << ", index_id=" << request->index_id() << ", txn_id=" <<
request->txn_id();
+ bool ret = _light_work_pool.offer([this, request, response, done]() {
+ VLOG_RPC << "tablet writer open, id=" << request->id()
+ << ", index_id=" << request->index_id() << ", txn_id=" <<
request->txn_id();
+ brpc::ClosureGuard closure_guard(done);
+ auto st = _exec_env->load_channel_mgr()->open(*request);
+ if (!st.ok()) {
+ LOG(WARNING) << "load channel open failed, message=" << st << ",
id=" << request->id()
+ << ", index_id=" << request->index_id()
+ << ", txn_id=" << request->txn_id();
+ }
+ st.to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
- st.to_protobuf(response->mutable_status());
}
-void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController*
cntl_base,
+void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController*
controller,
const PExecPlanFragmentRequest*
request,
PExecPlanFragmentResult*
response,
google::protobuf::Closure* done)
{
- auto span = telemetry::start_rpc_server_span("exec_plan_fragment",
cntl_base);
+ auto span = telemetry::start_rpc_server_span("exec_plan_fragment",
controller);
auto scope = OpentelemetryScope {span};
brpc::ClosureGuard closure_guard(done);
auto st = Status::OK();
@@ -168,67 +211,95 @@ void
PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c
st.to_protobuf(response->mutable_status());
}
-void
PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController*
cntl_base,
+void
PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController*
controller,
const
PExecPlanFragmentRequest* request,
PExecPlanFragmentResult*
response,
google::protobuf::Closure* done) {
- exec_plan_fragment(cntl_base, request, response, done);
+ bool ret = _light_work_pool.offer([this, controller, request, response,
done]() {
+ exec_plan_fragment(controller, request, response, done);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ }
}
void
PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcController*
controller,
const
PExecPlanFragmentStartRequest* request,
PExecPlanFragmentResult*
result,
google::protobuf::Closure*
done) {
- auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start",
controller);
- auto scope = OpentelemetryScope {span};
- brpc::ClosureGuard closure_guard(done);
- auto st = _exec_env->fragment_mgr()->start_query_execution(request);
- st.to_protobuf(result->mutable_status());
+ bool ret = _light_work_pool.offer([this, controller, request, result,
done]() {
+ auto span =
telemetry::start_rpc_server_span("exec_plan_fragment_start", controller);
+ auto scope = OpentelemetryScope {span};
+ brpc::ClosureGuard closure_guard(done);
+ auto st = _exec_env->fragment_mgr()->start_query_execution(request);
+ st.to_protobuf(result->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ result->mutable_status()->add_error_msgs("fail to offer request to the
work pool");
+ }
}
-void
PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController*
cntl_base,
+void
PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController*
controller,
const
PTabletWriterAddBlockRequest* request,
PTabletWriterAddBlockResult* response,
google::protobuf::Closure*
done) {
- // TODO(zxy) delete in 1.2 version
- google::protobuf::Closure* new_done = new
NewHttpClosure<PTransmitDataParams>(done);
- brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
- attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request,
cntl);
+ bool ret = _heavy_work_pool.offer([this, controller, request, response,
done]() {
+ // TODO(zxy) delete in 1.2 version
+ google::protobuf::Closure* new_done = new
NewHttpClosure<PTransmitDataParams>(done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+
attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl);
- _tablet_writer_add_block(cntl_base, request, response, new_done);
+ _tablet_writer_add_block(controller, request, response, new_done);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ }
}
void PInternalServiceImpl::tablet_writer_add_block_by_http(
- google::protobuf::RpcController* cntl_base, const
::doris::PEmptyRequest* request,
+ google::protobuf::RpcController* controller, const
::doris::PEmptyRequest* request,
PTabletWriterAddBlockResult* response, google::protobuf::Closure*
done) {
- PTabletWriterAddBlockRequest* new_request = new
PTabletWriterAddBlockRequest();
- google::protobuf::Closure* new_done =
- new NewHttpClosure<PTabletWriterAddBlockRequest>(new_request,
done);
- brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
- Status st =
attachment_extract_request_contain_block<PTabletWriterAddBlockRequest>(new_request,
-
cntl);
- if (st.ok()) {
- _tablet_writer_add_block(cntl_base, new_request, response, new_done);
- } else {
- st.to_protobuf(response->mutable_status());
+ bool ret = _heavy_work_pool.offer([this, controller, response, done]() {
+ PTabletWriterAddBlockRequest* new_request = new
PTabletWriterAddBlockRequest();
+ google::protobuf::Closure* new_done =
+ new NewHttpClosure<PTabletWriterAddBlockRequest>(new_request,
done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+ Status st =
attachment_extract_request_contain_block<PTabletWriterAddBlockRequest>(
+ new_request, cntl);
+ if (st.ok()) {
+ _tablet_writer_add_block(controller, new_request, response,
new_done);
+ } else {
+ st.to_protobuf(response->mutable_status());
+ }
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
}
-void
PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController*
cntl_base,
+void
PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController*
controller,
const
PTabletWriterAddBlockRequest* request,
PTabletWriterAddBlockResult* response,
google::protobuf::Closure*
done) {
- VLOG_RPC << "tablet writer add block, id=" << request->id()
- << ", index_id=" << request->index_id() << ", sender_id=" <<
request->sender_id()
- << ", current_queued_size=" <<
_tablet_worker_pool.get_queue_size();
int64_t submit_task_time_ns = MonotonicNanos();
- _tablet_worker_pool.offer([request, response, done, submit_task_time_ns,
this]() {
+ bool ret = _heavy_work_pool.offer([request, response, done,
submit_task_time_ns, this]() {
int64_t wait_execution_time_ns = MonotonicNanos() -
submit_task_time_ns;
brpc::ClosureGuard closure_guard(done);
int64_t execution_time_ns = 0;
{
SCOPED_RAW_TIMER(&execution_time_ns);
-
auto st = _exec_env->load_channel_mgr()->add_batch(*request,
response);
if (!st.ok()) {
LOG(WARNING) << "tablet writer add block failed, message=" <<
st
@@ -241,20 +312,32 @@ void
PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl
response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO);
response->set_wait_execution_time_us(wait_execution_time_ns /
NANOS_PER_MICRO);
});
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ }
}
void
PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController*
controller,
const
PTabletWriterCancelRequest* request,
PTabletWriterCancelResult*
response,
google::protobuf::Closure*
done) {
- VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id="
<< request->index_id()
- << ", sender_id=" << request->sender_id();
- brpc::ClosureGuard closure_guard(done);
- auto st = _exec_env->load_channel_mgr()->cancel(*request);
- if (!st.ok()) {
- LOG(WARNING) << "tablet writer cancel failed, id=" << request->id()
- << ", index_id=" << request->index_id()
- << ", sender_id=" << request->sender_id();
+ bool ret = _light_work_pool.offer([this, request, done]() {
+ VLOG_RPC << "tablet writer cancel, id=" << request->id()
+ << ", index_id=" << request->index_id() << ", sender_id=" <<
request->sender_id();
+ brpc::ClosureGuard closure_guard(done);
+ auto st = _exec_env->load_channel_mgr()->cancel(*request);
+ if (!st.ok()) {
+ LOG(WARNING) << "tablet writer cancel failed, id=" << request->id()
+ << ", index_id=" << request->index_id()
+ << ", sender_id=" << request->sender_id();
+ }
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
}
}
@@ -298,125 +381,149 @@ Status PInternalServiceImpl::_exec_plan_fragment(const
std::string& ser_request,
}
}
-void
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController*
cntl_base,
+void
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController*
controller,
const
PCancelPlanFragmentRequest* request,
PCancelPlanFragmentResult*
result,
google::protobuf::Closure*
done) {
- auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start",
cntl_base);
- auto scope = OpentelemetryScope {span};
- brpc::ClosureGuard closure_guard(done);
- TUniqueId tid;
- tid.__set_hi(request->finst_id().hi());
- tid.__set_lo(request->finst_id().lo());
-
- Status st = Status::OK();
- if (request->has_cancel_reason()) {
- LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid)
- << ", reason: " << request->cancel_reason();
- _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
- } else {
- LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid);
- _exec_env->fragment_mgr()->cancel(tid);
+ bool ret = _light_work_pool.offer([this, controller, request, result,
done]() {
+ auto span =
telemetry::start_rpc_server_span("exec_plan_fragment_start", controller);
+ auto scope = OpentelemetryScope {span};
+ brpc::ClosureGuard closure_guard(done);
+ TUniqueId tid;
+ tid.__set_hi(request->finst_id().hi());
+ tid.__set_lo(request->finst_id().lo());
+
+ Status st = Status::OK();
+ if (request->has_cancel_reason()) {
+ LOG(INFO) << "cancel fragment, fragment_instance_id=" <<
print_id(tid)
+ << ", reason: " << request->cancel_reason();
+ _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
+ } else {
+ LOG(INFO) << "cancel fragment, fragment_instance_id=" <<
print_id(tid);
+ _exec_env->fragment_mgr()->cancel(tid);
+ }
+ // TODO: the logic seems useless, cancel only return Status::OK.
remove it
+ st.to_protobuf(result->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ result->mutable_status()->add_error_msgs("fail to offer request to the
work pool");
}
-
- // TODO: the logic seems useless, cancel only return Status::OK. remove it
- st.to_protobuf(result->mutable_status());
}
-void PInternalServiceImpl::fetch_data(google::protobuf::RpcController*
cntl_base,
+void PInternalServiceImpl::fetch_data(google::protobuf::RpcController*
controller,
const PFetchDataRequest* request,
PFetchDataResult* result,
google::protobuf::Closure* done) {
- brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
- GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
- _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
+ bool ret = _heavy_work_pool.offer([this, controller, request, result,
done]() {
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+ GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
+ _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ result->mutable_status()->add_error_msgs("fail to offer request to the
work pool");
+ }
}
void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController*
controller,
const PFetchTableSchemaRequest*
request,
PFetchTableSchemaResult* result,
google::protobuf::Closure* done)
{
- VLOG_RPC << "fetch table schema";
- brpc::ClosureGuard closure_guard(done);
- TFileScanRange file_scan_range;
- Status st = Status::OK();
- {
- const uint8_t* buf = (const
uint8_t*)(request->file_scan_range().data());
- uint32_t len = request->file_scan_range().size();
- st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
+ bool ret = _heavy_work_pool.offer([request, result, done]() {
+ VLOG_RPC << "fetch table schema";
+ brpc::ClosureGuard closure_guard(done);
+ TFileScanRange file_scan_range;
+ Status st = Status::OK();
+ {
+ const uint8_t* buf = (const
uint8_t*)(request->file_scan_range().data());
+ uint32_t len = request->file_scan_range().size();
+ st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
+ if (!st.ok()) {
+ LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ }
+ if (file_scan_range.__isset.ranges == false) {
+ st = Status::InternalError("can not get TFileRangeDesc.");
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ if (file_scan_range.__isset.params == false) {
+ st = Status::InternalError("can not get TFileScanRangeParams.");
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ const TFileRangeDesc& range = file_scan_range.ranges.at(0);
+ const TFileScanRangeParams& params = file_scan_range.params;
+
+ std::unique_ptr<vectorized::GenericReader> reader(nullptr);
+ std::unique_ptr<RuntimeProfile> profile(new
RuntimeProfile("FetchTableSchema"));
+ IOContext io_ctx;
+ FileCacheStatistics file_cache_statis;
+ io_ctx.file_cache_stats = &file_cache_statis;
+ switch (params.format_type) {
+ case TFileFormatType::FORMAT_CSV_PLAIN:
+ case TFileFormatType::FORMAT_CSV_GZ:
+ case TFileFormatType::FORMAT_CSV_BZ2:
+ case TFileFormatType::FORMAT_CSV_LZ4FRAME:
+ case TFileFormatType::FORMAT_CSV_LZOP:
+ case TFileFormatType::FORMAT_CSV_DEFLATE: {
+ // file_slots is no use
+ std::vector<SlotDescriptor*> file_slots;
+ reader.reset(
+ new vectorized::CsvReader(profile.get(), params, range,
file_slots, &io_ctx));
+ break;
+ }
+ case TFileFormatType::FORMAT_PARQUET: {
+ reader.reset(new vectorized::ParquetReader(params, range,
&io_ctx));
+ break;
+ }
+ case TFileFormatType::FORMAT_ORC: {
+ std::vector<std::string> column_names;
+ reader.reset(new vectorized::OrcReader(params, range,
column_names, "", &io_ctx));
+ break;
+ }
+ case TFileFormatType::FORMAT_JSON: {
+ std::vector<SlotDescriptor*> file_slots;
+ reader.reset(new vectorized::NewJsonReader(profile.get(), params,
range, file_slots,
+ &io_ctx));
+ break;
+ }
+ default:
+ st = Status::InternalError("Not supported file format in fetch
table schema: {}",
+ params.format_type);
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ std::vector<std::string> col_names;
+ std::vector<TypeDescriptor> col_types;
+ st = reader->get_parsed_schema(&col_names, &col_types);
if (!st.ok()) {
LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
st.to_protobuf(result->mutable_status());
return;
}
- }
- if (file_scan_range.__isset.ranges == false) {
- st = Status::InternalError("can not get TFileRangeDesc.");
- st.to_protobuf(result->mutable_status());
- return;
- }
- if (file_scan_range.__isset.params == false) {
- st = Status::InternalError("can not get TFileScanRangeParams.");
- st.to_protobuf(result->mutable_status());
- return;
- }
- const TFileRangeDesc& range = file_scan_range.ranges.at(0);
- const TFileScanRangeParams& params = file_scan_range.params;
-
- std::unique_ptr<vectorized::GenericReader> reader(nullptr);
- std::unique_ptr<RuntimeProfile> profile(new
RuntimeProfile("FetchTableSchema"));
- IOContext io_ctx;
- FileCacheStatistics file_cache_statis;
- io_ctx.file_cache_stats = &file_cache_statis;
- switch (params.format_type) {
- case TFileFormatType::FORMAT_CSV_PLAIN:
- case TFileFormatType::FORMAT_CSV_GZ:
- case TFileFormatType::FORMAT_CSV_BZ2:
- case TFileFormatType::FORMAT_CSV_LZ4FRAME:
- case TFileFormatType::FORMAT_CSV_LZOP:
- case TFileFormatType::FORMAT_CSV_DEFLATE: {
- // file_slots is no use
- std::vector<SlotDescriptor*> file_slots;
- reader.reset(new vectorized::CsvReader(profile.get(), params, range,
file_slots, &io_ctx));
- break;
- }
- case TFileFormatType::FORMAT_PARQUET: {
- reader.reset(new vectorized::ParquetReader(params, range, &io_ctx));
- break;
- }
- case TFileFormatType::FORMAT_ORC: {
- std::vector<std::string> column_names;
- reader.reset(new vectorized::OrcReader(params, range, column_names,
"", &io_ctx));
- break;
- }
- case TFileFormatType::FORMAT_JSON: {
- std::vector<SlotDescriptor*> file_slots;
- reader.reset(
- new vectorized::NewJsonReader(profile.get(), params, range,
file_slots, &io_ctx));
- break;
- }
- default:
- st = Status::InternalError("Not supported file format in fetch table
schema: {}",
- params.format_type);
- st.to_protobuf(result->mutable_status());
- return;
- }
- std::vector<std::string> col_names;
- std::vector<TypeDescriptor> col_types;
- st = reader->get_parsed_schema(&col_names, &col_types);
- if (!st.ok()) {
- LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
+ result->set_column_nums(col_names.size());
+ for (size_t idx = 0; idx < col_names.size(); ++idx) {
+ result->add_column_names(col_names[idx]);
+ }
+ for (size_t idx = 0; idx < col_types.size(); ++idx) {
+ PTypeDesc* type_desc = result->add_column_types();
+ col_types[idx].to_protobuf(type_desc);
+ }
st.to_protobuf(result->mutable_status());
- return;
- }
- result->set_column_nums(col_names.size());
- for (size_t idx = 0; idx < col_names.size(); ++idx) {
- result->add_column_names(col_names[idx]);
- }
- for (size_t idx = 0; idx < col_types.size(); ++idx) {
- PTypeDesc* type_desc = result->add_column_types();
- col_types[idx].to_protobuf(type_desc);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ result->mutable_status()->add_error_msgs("fail to offer request to the
work pool");
}
- st.to_protobuf(result->mutable_status());
}
Status PInternalServiceImpl::_tablet_fetch_data(const PTabletKeyLookupRequest*
request,
@@ -435,200 +542,278 @@ void
PInternalServiceImpl::tablet_fetch_data(google::protobuf::RpcController* co
const PTabletKeyLookupRequest*
request,
PTabletKeyLookupResponse*
response,
google::protobuf::Closure* done) {
- [[maybe_unused]] brpc::Controller* cntl =
static_cast<brpc::Controller*>(controller);
- brpc::ClosureGuard guard(done);
- Status st = _tablet_fetch_data(request, response);
- st.to_protobuf(response->mutable_status());
+ bool ret = _heavy_work_pool.offer([this, controller, request, response,
done]() {
+ [[maybe_unused]] brpc::Controller* cntl =
static_cast<brpc::Controller*>(controller);
+ brpc::ClosureGuard guard(done);
+ Status st = _tablet_fetch_data(request, response);
+ st.to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ }
}
void PInternalServiceImpl::get_info(google::protobuf::RpcController*
controller,
const PProxyRequest* request,
PProxyResult* response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- // PProxyRequest is defined in gensrc/proto/internal_service.proto
- // Currently it supports 2 kinds of requests:
- // 1. get all kafka partition ids for given topic
- // 2. get all kafka partition offsets for given topic and timestamp.
- if (request->has_kafka_meta_request()) {
- const PKafkaMetaProxyRequest& kafka_request =
request->kafka_meta_request();
- if (!kafka_request.partition_id_for_latest_offsets().empty()) {
- // get latest offsets for specified partition ids
- std::vector<PIntegerPair> partition_offsets;
- Status st = _exec_env->routine_load_task_executor()
- ->get_kafka_latest_offsets_for_partitions(
- request->kafka_meta_request(),
&partition_offsets);
- if (st.ok()) {
- PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
- for (const auto& entry : partition_offsets) {
- PIntegerPair* res = part_offsets->add_offset_times();
- res->set_key(entry.key());
- res->set_val(entry.val());
+ bool ret = _heavy_work_pool.offer([this, request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ // PProxyRequest is defined in gensrc/proto/internal_service.proto
+ // Currently it supports 2 kinds of requests:
+ // 1. get all kafka partition ids for given topic
+ // 2. get all kafka partition offsets for given topic and timestamp.
+ if (request->has_kafka_meta_request()) {
+ const PKafkaMetaProxyRequest& kafka_request =
request->kafka_meta_request();
+ if (!kafka_request.partition_id_for_latest_offsets().empty()) {
+ // get latest offsets for specified partition ids
+ std::vector<PIntegerPair> partition_offsets;
+ Status st = _exec_env->routine_load_task_executor()
+ ->get_kafka_latest_offsets_for_partitions(
+ request->kafka_meta_request(),
&partition_offsets);
+ if (st.ok()) {
+ PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
+ for (const auto& entry : partition_offsets) {
+ PIntegerPair* res = part_offsets->add_offset_times();
+ res->set_key(entry.key());
+ res->set_val(entry.val());
+ }
}
- }
- st.to_protobuf(response->mutable_status());
- return;
- } else if (!kafka_request.offset_times().empty()) {
- // if offset_times() has elements, which means this request is to
get offset by timestamp.
- std::vector<PIntegerPair> partition_offsets;
- Status st =
-
_exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times(
- request->kafka_meta_request(), &partition_offsets);
- if (st.ok()) {
- PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
- for (const auto& entry : partition_offsets) {
- PIntegerPair* res = part_offsets->add_offset_times();
- res->set_key(entry.key());
- res->set_val(entry.val());
+ st.to_protobuf(response->mutable_status());
+ return;
+ } else if (!kafka_request.offset_times().empty()) {
+ // if offset_times() has elements, which means this request is
to get offset by timestamp.
+ std::vector<PIntegerPair> partition_offsets;
+ Status st = _exec_env->routine_load_task_executor()
+ ->get_kafka_partition_offsets_for_times(
+ request->kafka_meta_request(),
&partition_offsets);
+ if (st.ok()) {
+ PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
+ for (const auto& entry : partition_offsets) {
+ PIntegerPair* res = part_offsets->add_offset_times();
+ res->set_key(entry.key());
+ res->set_val(entry.val());
+ }
}
- }
- st.to_protobuf(response->mutable_status());
- return;
- } else {
- // get partition ids of topic
- std::vector<int32_t> partition_ids;
- Status st =
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
- request->kafka_meta_request(), &partition_ids);
- if (st.ok()) {
- PKafkaMetaProxyResult* kafka_result =
response->mutable_kafka_meta_result();
- for (int32_t id : partition_ids) {
- kafka_result->add_partition_ids(id);
+ st.to_protobuf(response->mutable_status());
+ return;
+ } else {
+ // get partition ids of topic
+ std::vector<int32_t> partition_ids;
+ Status st =
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
+ request->kafka_meta_request(), &partition_ids);
+ if (st.ok()) {
+ PKafkaMetaProxyResult* kafka_result =
response->mutable_kafka_meta_result();
+ for (int32_t id : partition_ids) {
+ kafka_result->add_partition_ids(id);
+ }
}
+ st.to_protobuf(response->mutable_status());
+ return;
}
- st.to_protobuf(response->mutable_status());
- return;
}
+ Status::OK().to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
- Status::OK().to_protobuf(response->mutable_status());
}
void PInternalServiceImpl::update_cache(google::protobuf::RpcController*
controller,
const PUpdateCacheRequest* request,
PCacheResponse* response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- _exec_env->result_cache()->update(request, response);
+ bool ret = _light_work_pool.offer([this, request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ _exec_env->result_cache()->update(request, response);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->set_status(PCacheStatus::CANCELED);
+ }
}
void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController*
controller,
const PFetchCacheRequest* request,
PFetchCacheResult* result,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- _exec_env->result_cache()->fetch(request, result);
+ bool ret = _heavy_work_pool.offer([this, request, result, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ _exec_env->result_cache()->fetch(request, result);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ result->set_status(PCacheStatus::CANCELED);
+ }
}
void PInternalServiceImpl::clear_cache(google::protobuf::RpcController*
controller,
const PClearCacheRequest* request,
PCacheResponse* response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- _exec_env->result_cache()->clear(request, response);
+ bool ret = _light_work_pool.offer([this, request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ _exec_env->result_cache()->clear(request, response);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->set_status(PCacheStatus::CANCELED);
+ }
}
void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController*
controller,
const ::doris::PMergeFilterRequest*
request,
::doris::PMergeFilterResponse*
response,
::google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- auto attachment =
static_cast<brpc::Controller*>(controller)->request_attachment();
- butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
- Status st = _exec_env->fragment_mgr()->merge_filter(request,
&zero_copy_input_stream);
- if (!st.ok()) {
- LOG(WARNING) << "merge meet error" << st.to_string();
+ bool ret = _light_work_pool.offer([this, controller, request, response,
done]() {
+ brpc::ClosureGuard closure_guard(done);
+ auto attachment =
static_cast<brpc::Controller*>(controller)->request_attachment();
+ butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
+ Status st = _exec_env->fragment_mgr()->merge_filter(request,
&zero_copy_input_stream);
+ if (!st.ok()) {
+ LOG(WARNING) << "merge meet error" << st.to_string();
+ }
+ st.to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
- st.to_protobuf(response->mutable_status());
}
void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController*
controller,
const ::doris::PPublishFilterRequest*
request,
::doris::PPublishFilterResponse*
response,
::google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- auto attachment =
static_cast<brpc::Controller*>(controller)->request_attachment();
- butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
- UniqueId unique_id(request->query_id());
- VLOG_NOTICE << "rpc apply_filter recv";
- Status st = _exec_env->fragment_mgr()->apply_filter(request,
&zero_copy_input_stream);
- if (!st.ok()) {
- LOG(WARNING) << "apply filter meet error: " << st.to_string();
+ bool ret = _light_work_pool.offer([this, controller, request, response,
done]() {
+ brpc::ClosureGuard closure_guard(done);
+ auto attachment =
static_cast<brpc::Controller*>(controller)->request_attachment();
+ butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
+ UniqueId unique_id(request->query_id());
+ VLOG_NOTICE << "rpc apply_filter recv";
+ Status st = _exec_env->fragment_mgr()->apply_filter(request,
&zero_copy_input_stream);
+ if (!st.ok()) {
+ LOG(WARNING) << "apply filter meet error: " << st.to_string();
+ }
+ st.to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
- st.to_protobuf(response->mutable_status());
}
void PInternalServiceImpl::send_data(google::protobuf::RpcController*
controller,
const PSendDataRequest* request,
PSendDataResult* response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- TUniqueId fragment_instance_id;
- fragment_instance_id.hi = request->fragment_instance_id().hi();
- fragment_instance_id.lo = request->fragment_instance_id().lo();
-
- auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
- if (pipe == nullptr) {
- response->mutable_status()->set_status_code(1);
- response->mutable_status()->add_error_msgs("pipe is null");
- } else {
- for (int i = 0; i < request->data_size(); ++i) {
- PDataRow* row = new PDataRow();
- row->CopyFrom(request->data(i));
- pipe->append_and_flush(reinterpret_cast<char*>(&row), sizeof(row),
- sizeof(row) + row->ByteSizeLong());
+ bool ret = _heavy_work_pool.offer([this, request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ TUniqueId fragment_instance_id;
+ fragment_instance_id.hi = request->fragment_instance_id().hi();
+ fragment_instance_id.lo = request->fragment_instance_id().lo();
+
+ auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+ if (pipe == nullptr) {
+ response->mutable_status()->set_status_code(1);
+ response->mutable_status()->add_error_msgs("pipe is null");
+ } else {
+ for (int i = 0; i < request->data_size(); ++i) {
+ PDataRow* row = new PDataRow();
+ row->CopyFrom(request->data(i));
+ pipe->append_and_flush(reinterpret_cast<char*>(&row),
sizeof(row),
+ sizeof(row) + row->ByteSizeLong());
+ }
+ response->mutable_status()->set_status_code(0);
}
- response->mutable_status()->set_status_code(0);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
}
void PInternalServiceImpl::commit(google::protobuf::RpcController* controller,
const PCommitRequest* request,
PCommitResult* response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- TUniqueId fragment_instance_id;
- fragment_instance_id.hi = request->fragment_instance_id().hi();
- fragment_instance_id.lo = request->fragment_instance_id().lo();
-
- auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
- if (pipe == nullptr) {
- response->mutable_status()->set_status_code(1);
- response->mutable_status()->add_error_msgs("pipe is null");
- } else {
- pipe->finish();
- response->mutable_status()->set_status_code(0);
+ bool ret = _light_work_pool.offer([this, request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ TUniqueId fragment_instance_id;
+ fragment_instance_id.hi = request->fragment_instance_id().hi();
+ fragment_instance_id.lo = request->fragment_instance_id().lo();
+
+ auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+ if (pipe == nullptr) {
+ response->mutable_status()->set_status_code(1);
+ response->mutable_status()->add_error_msgs("pipe is null");
+ } else {
+ pipe->finish();
+ response->mutable_status()->set_status_code(0);
+ }
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
}
void PInternalServiceImpl::rollback(google::protobuf::RpcController*
controller,
const PRollbackRequest* request,
PRollbackResult* response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- TUniqueId fragment_instance_id;
- fragment_instance_id.hi = request->fragment_instance_id().hi();
- fragment_instance_id.lo = request->fragment_instance_id().lo();
-
- auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
- if (pipe == nullptr) {
- response->mutable_status()->set_status_code(1);
- response->mutable_status()->add_error_msgs("pipe is null");
- } else {
- pipe->cancel("rollback");
- response->mutable_status()->set_status_code(0);
+ bool ret = _light_work_pool.offer([this, request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ TUniqueId fragment_instance_id;
+ fragment_instance_id.hi = request->fragment_instance_id().hi();
+ fragment_instance_id.lo = request->fragment_instance_id().lo();
+
+ auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+ if (pipe == nullptr) {
+ response->mutable_status()->set_status_code(1);
+ response->mutable_status()->add_error_msgs("pipe is null");
+ } else {
+ pipe->cancel("rollback");
+ response->mutable_status()->set_status_code(0);
+ }
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
}
-void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController*
cntl_base,
+void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController*
controller,
const PConstantExprRequest*
request,
PConstantExprResult* response,
google::protobuf::Closure* done)
{
- brpc::ClosureGuard closure_guard(done);
- brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-
- Status st = Status::OK();
- if (request->has_request()) {
+ bool ret = _light_work_pool.offer([this, request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ Status st = Status::OK();
st = _fold_constant_expr(request->request(), response);
- } else {
- // TODO(yangzhengguo) this is just for compatible with old version,
this should be removed in the release 0.15
- st = _fold_constant_expr(cntl->request_attachment().to_string(),
response);
- }
- if (!st.ok()) {
- LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st;
+ if (!st.ok()) {
+ LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st;
+ }
+ st.to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
- st.to_protobuf(response->mutable_status());
}
Status PInternalServiceImpl::_fold_constant_expr(const std::string&
ser_request,
@@ -643,31 +828,48 @@ Status PInternalServiceImpl::_fold_constant_expr(const
std::string& ser_request,
return FoldConstantExecutor().fold_constant_vexpr(t_request, response);
}
-void PInternalServiceImpl::transmit_block(google::protobuf::RpcController*
cntl_base,
+void PInternalServiceImpl::transmit_block(google::protobuf::RpcController*
controller,
const PTransmitDataParams* request,
PTransmitDataResult* response,
google::protobuf::Closure* done) {
- // TODO(zxy) delete in 1.2 version
- google::protobuf::Closure* new_done = new
NewHttpClosure<PTransmitDataParams>(done);
- brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
- attachment_transfer_request_block<PTransmitDataParams>(request, cntl);
+ bool ret = _heavy_work_pool.offer([this, controller, request, response,
done]() {
+ // TODO(zxy) delete in 1.2 version
+ google::protobuf::Closure* new_done = new
NewHttpClosure<PTransmitDataParams>(done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+ attachment_transfer_request_block<PTransmitDataParams>(request, cntl);
- _transmit_block(cntl_base, request, response, new_done, Status::OK());
+ _transmit_block(controller, request, response, new_done, Status::OK());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ }
}
-void
PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController*
cntl_base,
+void
PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController*
controller,
const PEmptyRequest* request,
PTransmitDataResult*
response,
google::protobuf::Closure*
done) {
- PTransmitDataParams* new_request = new PTransmitDataParams();
- google::protobuf::Closure* new_done =
- new NewHttpClosure<PTransmitDataParams>(new_request, done);
- brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
- Status st =
attachment_extract_request_contain_block<PTransmitDataParams>(new_request,
cntl);
- _transmit_block(cntl_base, new_request, response, new_done, st);
+ bool ret = _heavy_work_pool.offer([this, controller, response, done]() {
+ PTransmitDataParams* new_request = new PTransmitDataParams();
+ google::protobuf::Closure* new_done =
+ new NewHttpClosure<PTransmitDataParams>(new_request, done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+ Status st =
+
attachment_extract_request_contain_block<PTransmitDataParams>(new_request,
cntl);
+ _transmit_block(controller, new_request, response, new_done, st);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ }
}
-void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController*
cntl_base,
+void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController*
controller,
const PTransmitDataParams* request,
PTransmitDataResult* response,
google::protobuf::Closure* done,
@@ -705,25 +907,34 @@ void
PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* co
const PCheckRPCChannelRequest*
request,
PCheckRPCChannelResponse*
response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(0);
- if (request->data().size() != request->size()) {
- std::stringstream ss;
- ss << "data size not same, expected: " << request->size()
- << ", actual: " << request->data().size();
- response->mutable_status()->add_error_msgs(ss.str());
- response->mutable_status()->set_status_code(1);
-
- } else {
- Md5Digest digest;
- digest.update(static_cast<const void*>(request->data().c_str()),
request->data().size());
- digest.digest();
- if (!iequal(digest.hex(), request->md5())) {
+ bool ret = _light_work_pool.offer([request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(0);
+ if (request->data().size() != request->size()) {
std::stringstream ss;
- ss << "md5 not same, expected: " << request->md5() << ", actual: "
<< digest.hex();
+ ss << "data size not same, expected: " << request->size()
+ << ", actual: " << request->data().size();
response->mutable_status()->add_error_msgs(ss.str());
response->mutable_status()->set_status_code(1);
+
+ } else {
+ Md5Digest digest;
+ digest.update(static_cast<const void*>(request->data().c_str()),
+ request->data().size());
+ digest.digest();
+ if (!iequal(digest.hex(), request->md5())) {
+ std::stringstream ss;
+ ss << "md5 not same, expected: " << request->md5() << ",
actual: " << digest.hex();
+ response->mutable_status()->add_error_msgs(ss.str());
+ response->mutable_status()->set_status_code(1);
+ }
}
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
}
@@ -731,44 +942,60 @@ void
PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* co
const PResetRPCChannelRequest*
request,
PResetRPCChannelResponse*
response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(0);
- if (request->all()) {
- int size =
ExecEnv::GetInstance()->brpc_internal_client_cache()->size();
- if (size > 0) {
- std::vector<std::string> endpoints;
-
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints);
- ExecEnv::GetInstance()->brpc_internal_client_cache()->clear();
- *response->mutable_channels() = {endpoints.begin(),
endpoints.end()};
- }
- } else {
- for (const std::string& endpoint : request->endpoints()) {
- if
(!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) {
- response->mutable_status()->add_error_msgs(endpoint + ": not
found.");
- continue;
+ bool ret = _light_work_pool.offer([request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(0);
+ if (request->all()) {
+ int size =
ExecEnv::GetInstance()->brpc_internal_client_cache()->size();
+ if (size > 0) {
+ std::vector<std::string> endpoints;
+
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints);
+ ExecEnv::GetInstance()->brpc_internal_client_cache()->clear();
+ *response->mutable_channels() = {endpoints.begin(),
endpoints.end()};
}
+ } else {
+ for (const std::string& endpoint : request->endpoints()) {
+ if
(!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) {
+ response->mutable_status()->add_error_msgs(endpoint + ":
not found.");
+ continue;
+ }
- if
(ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) {
- response->add_channels(endpoint);
- } else {
- response->mutable_status()->add_error_msgs(endpoint + ": reset
failed.");
+ if
(ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) {
+ response->add_channels(endpoint);
+ } else {
+ response->mutable_status()->add_error_msgs(endpoint + ":
reset failed.");
+ }
+ }
+ if (request->endpoints_size() != response->channels_size()) {
+ response->mutable_status()->set_status_code(1);
}
}
- if (request->endpoints_size() != response->channels_size()) {
- response->mutable_status()->set_status_code(1);
- }
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
}
-void PInternalServiceImpl::hand_shake(google::protobuf::RpcController*
cntl_base,
+void PInternalServiceImpl::hand_shake(google::protobuf::RpcController*
controller,
const PHandShakeRequest* request,
PHandShakeResponse* response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- if (request->has_hello()) {
- response->set_hello(request->hello());
+ bool ret = _light_work_pool.offer([request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ if (request->has_hello()) {
+ response->set_hello(request->hello());
+ }
+ response->mutable_status()->set_status_code(0);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
- response->mutable_status()->set_status_code(0);
}
void PInternalServiceImpl::request_slave_tablet_pull_rowset(
@@ -783,8 +1010,8 @@ void
PInternalServiceImpl::request_slave_tablet_pull_rowset(
int64_t brpc_port = request->brpc_port();
std::string token = request->token();
int64_t node_id = request->node_id();
- _slave_replica_worker_pool.offer([rowset_meta_pb, host, brpc_port,
node_id, segments_size,
- http_port, token, rowset_path, this]() {
+ bool ret = _heavy_work_pool.offer([rowset_meta_pb, host, brpc_port,
node_id, segments_size,
+ http_port, token, rowset_path, this]() {
TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(
rowset_meta_pb.tablet_id(),
rowset_meta_pb.tablet_schema_hash());
if (tablet == nullptr) {
@@ -925,6 +1152,12 @@ void
PInternalServiceImpl::request_slave_tablet_pull_rowset(
_response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
rowset_meta->tablet_id(), node_id, true);
});
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ }
Status::OK().to_protobuf(response->mutable_status());
}
@@ -983,14 +1216,22 @@ void
PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote
void PInternalServiceImpl::response_slave_tablet_pull_rowset(
google::protobuf::RpcController* controller, const
PTabletWriteSlaveDoneRequest* request,
PTabletWriteSlaveDoneResult* response, google::protobuf::Closure*
done) {
- brpc::ClosureGuard closure_guard(done);
- VLOG_CRITICAL
- << "receive the result of slave replica pull rowset from slave
replica. slave server="
- << request->node_id() << ", is_succeed=" << request->is_succeed()
- << ", tablet_id=" << request->tablet_id() << ", txn_id=" <<
request->txn_id();
- StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset(
- request->txn_id(), request->tablet_id(), request->node_id(),
request->is_succeed());
- Status::OK().to_protobuf(response->mutable_status());
+ bool ret = _heavy_work_pool.offer([request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ VLOG_CRITICAL << "receive the result of slave replica pull rowset from
slave replica. "
+ "slave server="
+ << request->node_id() << ", is_succeed=" <<
request->is_succeed()
+ << ", tablet_id=" << request->tablet_id() << ", txn_id="
<< request->txn_id();
+
StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset(
+ request->txn_id(), request->tablet_id(), request->node_id(),
request->is_succeed());
+ Status::OK().to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ }
}
static Status read_by_rowids(
@@ -1109,9 +1350,7 @@ void
PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro
const PMultiGetRequest* request,
PMultiGetResponse* response,
google::protobuf::Closure* done) {
- // Submit task to seperate ThreadPool for avoiding block bthread working
pthread
- ThreadPool* task_pool =
StorageEngine::instance()->get_bg_multiget_threadpool();
- Status submit_st = task_pool->submit_func([request, response, done,
this]() {
+ bool ret = _heavy_work_pool.offer([request, response, done, this]() {
// multi get data by rowid
MonotonicStopWatch watch;
watch.start();
@@ -1121,9 +1360,11 @@ void
PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro
st.to_protobuf(response->mutable_status());
LOG(INFO) << "multiget_data finished, cost(us):" <<
watch.elapsed_time() / 1000;
});
- if (!submit_st.ok()) {
- submit_st.to_protobuf(response->mutable_status());
- done->Run();
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
}
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index 103293a745..4c500a245f 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -190,8 +190,13 @@ private:
private:
ExecEnv* _exec_env;
- PriorityThreadPool _tablet_worker_pool;
- PriorityThreadPool _slave_replica_worker_pool;
+
+ // every brpc service request should put into thread pool
+ // the reason see issue #16634
+ // define the interface for reading and writing data as heavy interface
+ // otherwise as light interface
+ PriorityThreadPool _heavy_work_pool;
+ PriorityThreadPool _light_work_pool;
};
} // namespace doris
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 87afed763b..4982862035 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -222,6 +222,16 @@ public:
IntCounter* upload_rowset_count;
IntCounter* upload_fail_count;
+ UIntGauge* light_work_pool_queue_size;
+ UIntGauge* heavy_work_pool_queue_size;
+ UIntGauge* heavy_work_active_threads;
+ UIntGauge* light_work_active_threads;
+
+ UIntGauge* heavy_work_pool_max_queue_size;
+ UIntGauge* light_work_pool_max_queue_size;
+ UIntGauge* heavy_work_max_threads;
+ UIntGauge* light_work_max_threads;
+
static DorisMetrics* instance() {
static DorisMetrics instance;
return &instance;
diff --git a/be/src/util/priority_thread_pool.hpp
b/be/src/util/priority_thread_pool.hpp
index 3bbc53788c..7e2b8b5f77 100644
--- a/be/src/util/priority_thread_pool.hpp
+++ b/be/src/util/priority_thread_pool.hpp
@@ -55,7 +55,7 @@ public:
// queue exceeds this size, subsequent calls to Offer will block until
there is
// capacity available.
PriorityThreadPool(uint32_t num_threads, uint32_t queue_size, const
std::string& name)
- : _work_queue(queue_size), _shutdown(false), _name(name) {
+ : _work_queue(queue_size), _shutdown(false), _name(name),
_active_threads(0) {
for (int i = 0; i < num_threads; ++i) {
_threads.create_thread(
std::bind<void>(std::mem_fn(&PriorityThreadPool::work_thread), this, i));
@@ -101,6 +101,7 @@ public:
virtual void join() { _threads.join_all(); }
virtual uint32_t get_queue_size() const { return _work_queue.get_size(); }
+ virtual uint32_t get_active_threads() const { return _active_threads; }
// Blocks until the work queue is empty, and then calls shutdown to stop
the worker
// threads and Join to wait until they are finished.
@@ -136,7 +137,9 @@ private:
while (!is_shutdown()) {
Task task;
if (_work_queue.blocking_get(&task)) {
+ _active_threads++;
task.work_function();
+ _active_threads--;
}
if (_work_queue.get_size() == 0) {
_empty_cv.notify_all();
@@ -151,6 +154,7 @@ private:
// Set to true when threads should stop doing work and terminate.
std::atomic<bool> _shutdown;
std::string _name;
+ std::atomic<int> _active_threads;
};
} // namespace doris
diff --git
a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
index 3c7b49d6bd..0d542431f3 100644
--- a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
+++ b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
@@ -298,6 +298,12 @@ curl http://be_host:webserver_port/metrics?type=json
|`fragment_thread_pool_queue_size`| | Num | 当前查询执行线程池等待队列的长度 |
如果大于零,则说明查询线程已耗尽,查询会出现堆积 | P0 |
|`doris_be_all_rowset_nums`| | Num | 当前所有 rowset 的个数 | | P0 |
|`doris_be_all_segment_nums`| | Num | 当前所有 segment 的个数 | | P0 |
+|`doris_be_heavy_work_max_threads`| | Num | brpc heavy线程池线程个数| | p0 |
+|`doris_be_light_work_max_threads`| | Num | brpc light线程池线程个数| | p0 |
+|`doris_be_heavy_work_pool_queue_size`| | Num | brpc
heavy线程池队列最大长度,超过则阻塞提交work| | p0 |
+|`doris_be_light_work_pool_queue_size`| | Num | brpc
light线程池队列最大长度,超过则阻塞提交work| | p0 |
+|`doris_be_heavy_work_active_threads`| | Num | brpc heavy线程池活跃线程数| | p0 |
+|`doris_be_light_work_active_threads`| | Num | brpc light线程池活跃线程数| | p0 |
### 机器监控
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 984487ebd2..05746715c3 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -276,6 +276,7 @@ enum PCacheStatus {
INVALID_KEY_RANGE = 6;
DATA_OVERDUE = 7;
EMPTY_DATA = 8;
+ CANCELED = 9;
};
enum CacheType {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]