This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new df860e1db6 Revert "[Improvement](brpc) Using a thread pool for RPC
service avoiding std::mutex block brpc::bthread (#16639)" (#17198)
df860e1db6 is described below
commit df860e1db64d83ffae630decbc08fd2f151cf387
Author: Mingyu Chen <[email protected]>
AuthorDate: Mon Feb 27 20:50:39 2023 +0800
Revert "[Improvement](brpc) Using a thread pool for RPC service avoiding
std::mutex block brpc::bthread (#16639)" (#17198)
This reverts commit 8534abc47976969600b09e19d74bee5393f61d8d.
This PR has some potential dead lock issue, so revert it from 1.2 branch.
---
be/src/common/config.h | 14 +-
be/src/service/internal_service.cpp | 922 ++++++++-------------
be/src/service/internal_service.h | 9 +-
be/src/util/doris_metrics.h | 10 -
be/src/util/priority_thread_pool.hpp | 6 +-
.../maint-monitor/monitor-metrics/metrics.md | 6 -
gensrc/proto/internal_service.proto | 1 -
7 files changed, 347 insertions(+), 621 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index b278a9c55c..4a97529abc 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -35,8 +35,7 @@ CONF_Int32(be_port, "9060");
// port for brpc
CONF_Int32(brpc_port, "8060");
-// the number of bthreads for brpc, the default value is set to -1,
-// which means the number of bthreads is #cpu-cores
+// the number of bthreads for brpc, the default value is set to -1, which
means the number of bthreads is #cpu-cores
CONF_Int32(brpc_num_threads, "-1");
// port to brpc server for single replica load
@@ -400,15 +399,8 @@ CONF_Int32(single_replica_load_download_num_workers, "64");
CONF_Int64(load_data_reserve_hours, "4");
// log error log will be removed after this time
CONF_mInt64(load_error_log_reserve_hours, "48");
-
-// be brpc interface is classified into two categories: light and heavy
-// each category has diffrent thread number
-// threads to handle heavy api interface, such as transmit_data/transmit_block
etc
-CONF_Int32(brpc_heavy_work_pool_threads, "192");
-// threads to handle light api interface, such as
exec_plan_fragment_prepare/exec_plan_fragment_start
-CONF_Int32(brpc_light_work_pool_threads, "32");
-CONF_Int32(brpc_heavy_work_pool_max_queue_size, "10240");
-CONF_Int32(brpc_light_work_pool_max_queue_size, "10240");
+CONF_Int32(number_tablet_writer_threads, "16");
+CONF_Int32(number_slave_replica_download_threads, "64");
// The maximum amount of data that can be processed by a stream load
CONF_mInt64(streaming_load_max_mb, "10240");
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index aa3510452d..381782af85 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -60,15 +60,7 @@ namespace doris {
const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3;
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_queue_size,
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_queue_size,
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_active_threads,
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_active_threads,
MetricUnit::NOUNIT);
-
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_max_queue_size,
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_max_queue_size,
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_max_threads, MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_max_threads, MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(add_batch_task_queue_size,
MetricUnit::NOUNIT);
bthread_key_t btls_key;
@@ -102,42 +94,16 @@ private:
PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env)
: _exec_env(exec_env),
- _heavy_work_pool(config::brpc_heavy_work_pool_threads,
- config::brpc_heavy_work_pool_max_queue_size,
"brpc_heavy"),
- _light_work_pool(config::brpc_light_work_pool_threads,
- config::brpc_light_work_pool_max_queue_size,
"brpc_light") {
- REGISTER_HOOK_METRIC(heavy_work_pool_queue_size,
- [this]() { return _heavy_work_pool.get_queue_size();
});
- REGISTER_HOOK_METRIC(light_work_pool_queue_size,
- [this]() { return _light_work_pool.get_queue_size();
});
- REGISTER_HOOK_METRIC(heavy_work_active_threads,
- [this]() { return
_heavy_work_pool.get_active_threads(); });
- REGISTER_HOOK_METRIC(light_work_active_threads,
- [this]() { return
_light_work_pool.get_active_threads(); });
-
- REGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size,
- []() { return
config::brpc_heavy_work_pool_max_queue_size; });
- REGISTER_HOOK_METRIC(light_work_pool_max_queue_size,
- []() { return
config::brpc_light_work_pool_max_queue_size; });
- REGISTER_HOOK_METRIC(heavy_work_max_threads,
- []() { return config::brpc_heavy_work_pool_threads;
});
- REGISTER_HOOK_METRIC(light_work_max_threads,
- []() { return config::brpc_light_work_pool_threads;
});
-
+ _tablet_worker_pool(config::number_tablet_writer_threads, 10240,
"tablet_writer"),
+
_slave_replica_worker_pool(config::number_slave_replica_download_threads, 10240,
+ "replica_download") {
+ REGISTER_HOOK_METRIC(add_batch_task_queue_size,
+ [this]() { return
_tablet_worker_pool.get_queue_size(); });
CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter));
}
PInternalServiceImpl::~PInternalServiceImpl() {
- DEREGISTER_HOOK_METRIC(heavy_work_pool_queue_size);
- DEREGISTER_HOOK_METRIC(light_work_pool_queue_size);
- DEREGISTER_HOOK_METRIC(heavy_work_active_threads);
- DEREGISTER_HOOK_METRIC(light_work_active_threads);
-
- DEREGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size);
- DEREGISTER_HOOK_METRIC(light_work_pool_max_queue_size);
- DEREGISTER_HOOK_METRIC(heavy_work_max_threads);
- DEREGISTER_HOOK_METRIC(light_work_max_threads);
-
+ DEREGISTER_HOOK_METRIC(add_batch_task_queue_size);
CHECK_EQ(0, bthread_key_delete(btls_key));
}
@@ -165,7 +131,7 @@ void
PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController
_transmit_data(cntl_base, new_request, response, new_done, st);
}
-void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController*
controller,
+void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController*
cntl_base,
const PTransmitDataParams* request,
PTransmitDataResult* response,
google::protobuf::Closure* done,
@@ -203,31 +169,23 @@ void
PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c
const PTabletWriterOpenRequest*
request,
PTabletWriterOpenResult*
response,
google::protobuf::Closure* done)
{
- bool ret = _light_work_pool.offer([this, request, response, done]() {
- VLOG_RPC << "tablet writer open, id=" << request->id()
- << ", index_id=" << request->index_id() << ", txn_id=" <<
request->txn_id();
- brpc::ClosureGuard closure_guard(done);
- auto st = _exec_env->load_channel_mgr()->open(*request);
- if (!st.ok()) {
- LOG(WARNING) << "load channel open failed, message=" <<
st.get_error_msg()
- << ", id=" << request->id() << ", index_id=" <<
request->index_id()
- << ", txn_id=" << request->txn_id();
- }
- st.to_protobuf(response->mutable_status());
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" <<
request->index_id()
+ << ", txn_id=" << request->txn_id();
+ brpc::ClosureGuard closure_guard(done);
+ auto st = _exec_env->load_channel_mgr()->open(*request);
+ if (!st.ok()) {
+ LOG(WARNING) << "load channel open failed, message=" <<
st.get_error_msg()
+ << ", id=" << request->id() << ", index_id=" <<
request->index_id()
+ << ", txn_id=" << request->txn_id();
}
+ st.to_protobuf(response->mutable_status());
}
-void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController*
controller,
+void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController*
cntl_base,
const PExecPlanFragmentRequest*
request,
PExecPlanFragmentResult*
response,
google::protobuf::Closure* done)
{
- auto span = telemetry::start_rpc_server_span("exec_plan_fragment",
controller);
+ auto span = telemetry::start_rpc_server_span("exec_plan_fragment",
cntl_base);
auto scope = OpentelemetryScope {span};
brpc::ClosureGuard closure_guard(done);
auto st = Status::OK();
@@ -241,95 +199,67 @@ void
PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c
st.to_protobuf(response->mutable_status());
}
-void
PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController*
controller,
+void
PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController*
cntl_base,
const
PExecPlanFragmentRequest* request,
PExecPlanFragmentResult*
response,
google::protobuf::Closure* done) {
- bool ret = _light_work_pool.offer([this, controller, request, response,
done]() {
- exec_plan_fragment(controller, request, response, done);
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
- }
+ exec_plan_fragment(cntl_base, request, response, done);
}
void
PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcController*
controller,
const
PExecPlanFragmentStartRequest* request,
PExecPlanFragmentResult*
result,
google::protobuf::Closure*
done) {
- bool ret = _light_work_pool.offer([this, controller, request, result,
done]() {
- auto span =
telemetry::start_rpc_server_span("exec_plan_fragment_start", controller);
- auto scope = OpentelemetryScope {span};
- brpc::ClosureGuard closure_guard(done);
- auto st = _exec_env->fragment_mgr()->start_query_execution(request);
- st.to_protobuf(result->mutable_status());
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- result->mutable_status()->add_error_msgs("fail to offer request to the
work pool");
- }
+ auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start",
controller);
+ auto scope = OpentelemetryScope {span};
+ brpc::ClosureGuard closure_guard(done);
+ auto st = _exec_env->fragment_mgr()->start_query_execution(request);
+ st.to_protobuf(result->mutable_status());
}
-void
PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController*
controller,
+void
PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController*
cntl_base,
const
PTabletWriterAddBlockRequest* request,
PTabletWriterAddBlockResult* response,
google::protobuf::Closure*
done) {
- bool ret = _heavy_work_pool.offer([this, controller, request, response,
done]() {
- // TODO(zxy) delete in 1.2 version
- google::protobuf::Closure* new_done = new
NewHttpClosure<PTransmitDataParams>(done);
- brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
-
attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl);
+ // TODO(zxy) delete in 1.2 version
+ google::protobuf::Closure* new_done = new
NewHttpClosure<PTransmitDataParams>(done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+ attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request,
cntl);
- _tablet_writer_add_block(controller, request, response, new_done);
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
- }
+ _tablet_writer_add_block(cntl_base, request, response, new_done);
}
void PInternalServiceImpl::tablet_writer_add_block_by_http(
- google::protobuf::RpcController* controller, const
::doris::PEmptyRequest* request,
+ google::protobuf::RpcController* cntl_base, const
::doris::PEmptyRequest* request,
PTabletWriterAddBlockResult* response, google::protobuf::Closure*
done) {
- bool ret = _heavy_work_pool.offer([this, controller, response, done]() {
- PTabletWriterAddBlockRequest* new_request = new
PTabletWriterAddBlockRequest();
- google::protobuf::Closure* new_done =
- new NewHttpClosure<PTabletWriterAddBlockRequest>(new_request,
done);
- brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
- Status st =
attachment_extract_request_contain_block<PTabletWriterAddBlockRequest>(
- new_request, cntl);
- if (st.ok()) {
- _tablet_writer_add_block(controller, new_request, response,
new_done);
- } else {
- st.to_protobuf(response->mutable_status());
- }
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ PTabletWriterAddBlockRequest* new_request = new
PTabletWriterAddBlockRequest();
+ google::protobuf::Closure* new_done =
+ new NewHttpClosure<PTabletWriterAddBlockRequest>(new_request,
done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+ Status st =
attachment_extract_request_contain_block<PTabletWriterAddBlockRequest>(new_request,
+
cntl);
+ if (st.ok()) {
+ _tablet_writer_add_block(cntl_base, new_request, response, new_done);
+ } else {
+ st.to_protobuf(response->mutable_status());
}
}
-void
PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController*
controller,
+void
PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController*
cntl_base,
const
PTabletWriterAddBlockRequest* request,
PTabletWriterAddBlockResult* response,
google::protobuf::Closure*
done) {
+ VLOG_RPC << "tablet writer add block, id=" << request->id()
+ << ", index_id=" << request->index_id() << ", sender_id=" <<
request->sender_id()
+ << ", current_queued_size=" <<
_tablet_worker_pool.get_queue_size();
int64_t submit_task_time_ns = MonotonicNanos();
- bool ret = _heavy_work_pool.offer([request, response, done,
submit_task_time_ns, this]() {
+ _tablet_worker_pool.offer([request, response, done, submit_task_time_ns,
this]() {
int64_t wait_execution_time_ns = MonotonicNanos() -
submit_task_time_ns;
brpc::ClosureGuard closure_guard(done);
int64_t execution_time_ns = 0;
{
SCOPED_RAW_TIMER(&execution_time_ns);
+
auto st = _exec_env->load_channel_mgr()->add_batch(*request,
response);
if (!st.ok()) {
LOG(WARNING) << "tablet writer add block failed, message=" <<
st.get_error_msg()
@@ -342,12 +272,6 @@ void
PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl
response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO);
response->set_wait_execution_time_us(wait_execution_time_ns /
NANOS_PER_MICRO);
});
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
- }
}
void
PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcController*
cntl_base,
@@ -379,13 +303,13 @@ void
PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcControl
PTabletWriterAddBatchResult* response,
google::protobuf::Closure*
done) {
VLOG_RPC << "tablet writer add batch, id=" << request->id()
- << ", index_id=" << request->index_id() << ", sender_id=" <<
request->sender_id();
+ << ", index_id=" << request->index_id() << ", sender_id=" <<
request->sender_id()
+ << ", current_queued_size=" <<
_tablet_worker_pool.get_queue_size();
// add batch maybe cost a lot of time, and this callback thread will be
held.
// this will influence query execution, because the pthreads under bthread
may be
// exhausted, so we put this to a local thread pool to process
int64_t submit_task_time_ns = MonotonicNanos();
- bool ret = _heavy_work_pool.offer([cntl_base, request, response, done,
submit_task_time_ns,
- this]() {
+ _tablet_worker_pool.offer([cntl_base, request, response, done,
submit_task_time_ns, this]() {
int64_t wait_execution_time_ns = MonotonicNanos() -
submit_task_time_ns;
brpc::ClosureGuard closure_guard(done);
int64_t execution_time_ns = 0;
@@ -408,32 +332,20 @@ void
PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcControl
response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO);
response->set_wait_execution_time_us(wait_execution_time_ns /
NANOS_PER_MICRO);
});
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
- }
}
void
PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController*
controller,
const
PTabletWriterCancelRequest* request,
PTabletWriterCancelResult*
response,
google::protobuf::Closure*
done) {
- bool ret = _light_work_pool.offer([this, request, done]() {
- VLOG_RPC << "tablet writer cancel, id=" << request->id()
- << ", index_id=" << request->index_id() << ", sender_id=" <<
request->sender_id();
- brpc::ClosureGuard closure_guard(done);
- auto st = _exec_env->load_channel_mgr()->cancel(*request);
- if (!st.ok()) {
- LOG(WARNING) << "tablet writer cancel failed, id=" << request->id()
- << ", index_id=" << request->index_id()
- << ", sender_id=" << request->sender_id();
- }
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
+ VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id="
<< request->index_id()
+ << ", sender_id=" << request->sender_id();
+ brpc::ClosureGuard closure_guard(done);
+ auto st = _exec_env->load_channel_mgr()->cancel(*request);
+ if (!st.ok()) {
+ LOG(WARNING) << "tablet writer cancel failed, id=" << request->id()
+ << ", index_id=" << request->index_id()
+ << ", sender_id=" << request->sender_id();
}
}
@@ -465,404 +377,309 @@ Status PInternalServiceImpl::_exec_plan_fragment(const
std::string& ser_request,
}
}
-void
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController*
controller,
+void
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController*
cntl_base,
const
PCancelPlanFragmentRequest* request,
PCancelPlanFragmentResult*
result,
google::protobuf::Closure*
done) {
- bool ret = _light_work_pool.offer([this, controller, request, result,
done]() {
- auto span =
telemetry::start_rpc_server_span("exec_plan_fragment_start", controller);
- auto scope = OpentelemetryScope {span};
- brpc::ClosureGuard closure_guard(done);
- TUniqueId tid;
- tid.__set_hi(request->finst_id().hi());
- tid.__set_lo(request->finst_id().lo());
-
- Status st = Status::OK();
- if (request->has_cancel_reason()) {
- LOG(INFO) << "cancel fragment, fragment_instance_id=" <<
print_id(tid)
- << ", reason: " << request->cancel_reason();
- _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
- } else {
- LOG(INFO) << "cancel fragment, fragment_instance_id=" <<
print_id(tid);
- _exec_env->fragment_mgr()->cancel(tid);
- }
- // TODO: the logic seems useless, cancel only return Status::OK.
remove it
- st.to_protobuf(result->mutable_status());
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- result->mutable_status()->add_error_msgs("fail to offer request to the
work pool");
+ auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start",
cntl_base);
+ auto scope = OpentelemetryScope {span};
+ brpc::ClosureGuard closure_guard(done);
+ TUniqueId tid;
+ tid.__set_hi(request->finst_id().hi());
+ tid.__set_lo(request->finst_id().lo());
+
+ Status st;
+ if (request->has_cancel_reason()) {
+ LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid)
+ << ", reason: " << request->cancel_reason();
+ st = _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
+ } else {
+ LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid);
+ st = _exec_env->fragment_mgr()->cancel(tid);
}
+ if (!st.ok()) {
+ LOG(WARNING) << "cancel plan fragment failed, errmsg=" <<
st.get_error_msg();
+ }
+ st.to_protobuf(result->mutable_status());
}
-void PInternalServiceImpl::fetch_data(google::protobuf::RpcController*
controller,
+void PInternalServiceImpl::fetch_data(google::protobuf::RpcController*
cntl_base,
const PFetchDataRequest* request,
PFetchDataResult* result,
google::protobuf::Closure* done) {
- bool ret = _heavy_work_pool.offer([this, controller, request, result,
done]() {
- brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
- GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
- _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- result->mutable_status()->add_error_msgs("fail to offer request to the
work pool");
- }
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+ GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
+ _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
}
void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController*
controller,
const PFetchTableSchemaRequest*
request,
PFetchTableSchemaResult* result,
google::protobuf::Closure* done)
{
- bool ret = _heavy_work_pool.offer([request, result, done]() {
- VLOG_RPC << "fetch table schema";
- brpc::ClosureGuard closure_guard(done);
- TFileScanRange file_scan_range;
- Status st = Status::OK();
- {
- const uint8_t* buf = (const
uint8_t*)(request->file_scan_range().data());
- uint32_t len = request->file_scan_range().size();
- st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
- if (!st.ok()) {
- LOG(WARNING) << "fetch table schema failed, errmsg=" <<
st.get_error_msg();
- st.to_protobuf(result->mutable_status());
- return;
- }
- }
- if (file_scan_range.__isset.ranges == false) {
- st = Status::InternalError("can not get TFileRangeDesc.");
- st.to_protobuf(result->mutable_status());
- return;
- }
- if (file_scan_range.__isset.params == false) {
- st = Status::InternalError("can not get TFileScanRangeParams.");
- st.to_protobuf(result->mutable_status());
- return;
- }
- const TFileRangeDesc& range = file_scan_range.ranges.at(0);
- const TFileScanRangeParams& params = file_scan_range.params;
-
- std::unique_ptr<vectorized::GenericReader> reader(nullptr);
- std::unique_ptr<RuntimeProfile> profile(new
RuntimeProfile("FetchTableSchema"));
- switch (params.format_type) {
- case TFileFormatType::FORMAT_CSV_PLAIN:
- case TFileFormatType::FORMAT_CSV_GZ:
- case TFileFormatType::FORMAT_CSV_BZ2:
- case TFileFormatType::FORMAT_CSV_LZ4FRAME:
- case TFileFormatType::FORMAT_CSV_LZOP:
- case TFileFormatType::FORMAT_CSV_DEFLATE: {
- // file_slots is no use
- std::vector<SlotDescriptor*> file_slots;
- reader.reset(new vectorized::CsvReader(profile.get(), params,
range, file_slots));
- break;
- }
- case TFileFormatType::FORMAT_PARQUET: {
- reader.reset(new vectorized::ParquetReader(params, range));
- break;
- }
- case TFileFormatType::FORMAT_ORC: {
- std::vector<std::string> column_names;
- reader.reset(new vectorized::OrcReader(params, range,
column_names, ""));
- break;
- }
- case TFileFormatType::FORMAT_JSON: {
- std::vector<SlotDescriptor*> file_slots;
- reader.reset(new vectorized::NewJsonReader(profile.get(), params,
range, file_slots));
- break;
- }
- default:
- st = Status::InternalError("Not supported file format in fetch
table schema: {}",
- params.format_type);
- st.to_protobuf(result->mutable_status());
- return;
- }
- std::vector<std::string> col_names;
- std::vector<TypeDescriptor> col_types;
- st = reader->get_parsed_schema(&col_names, &col_types);
+ VLOG_RPC << "fetch table schema";
+ brpc::ClosureGuard closure_guard(done);
+ TFileScanRange file_scan_range;
+ Status st = Status::OK();
+ {
+ const uint8_t* buf = (const
uint8_t*)(request->file_scan_range().data());
+ uint32_t len = request->file_scan_range().size();
+ st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
if (!st.ok()) {
LOG(WARNING) << "fetch table schema failed, errmsg=" <<
st.get_error_msg();
st.to_protobuf(result->mutable_status());
return;
}
- result->set_column_nums(col_names.size());
- for (size_t idx = 0; idx < col_names.size(); ++idx) {
- result->add_column_names(col_names[idx]);
- }
- for (size_t idx = 0; idx < col_types.size(); ++idx) {
- PTypeDesc* type_desc = result->add_column_types();
- col_types[idx].to_protobuf(type_desc);
- }
+ }
+ if (file_scan_range.__isset.ranges == false) {
+ st = Status::InternalError("can not get TFileRangeDesc.");
st.to_protobuf(result->mutable_status());
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- result->mutable_status()->add_error_msgs("fail to offer request to the
work pool");
+ return;
}
+ if (file_scan_range.__isset.params == false) {
+ st = Status::InternalError("can not get TFileScanRangeParams.");
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ const TFileRangeDesc& range = file_scan_range.ranges.at(0);
+ const TFileScanRangeParams& params = file_scan_range.params;
+
+ std::unique_ptr<vectorized::GenericReader> reader(nullptr);
+ std::unique_ptr<RuntimeProfile> profile(new
RuntimeProfile("FetchTableSchema"));
+ switch (params.format_type) {
+ case TFileFormatType::FORMAT_CSV_PLAIN:
+ case TFileFormatType::FORMAT_CSV_GZ:
+ case TFileFormatType::FORMAT_CSV_BZ2:
+ case TFileFormatType::FORMAT_CSV_LZ4FRAME:
+ case TFileFormatType::FORMAT_CSV_LZOP:
+ case TFileFormatType::FORMAT_CSV_DEFLATE: {
+ // file_slots is no use
+ std::vector<SlotDescriptor*> file_slots;
+ reader.reset(new vectorized::CsvReader(profile.get(), params, range,
file_slots));
+ break;
+ }
+ case TFileFormatType::FORMAT_PARQUET: {
+ reader.reset(new vectorized::ParquetReader(params, range));
+ break;
+ }
+ case TFileFormatType::FORMAT_ORC: {
+ std::vector<std::string> column_names;
+ reader.reset(new vectorized::OrcReader(params, range, column_names,
""));
+ break;
+ }
+ case TFileFormatType::FORMAT_JSON: {
+ std::vector<SlotDescriptor*> file_slots;
+ reader.reset(new vectorized::NewJsonReader(profile.get(), params,
range, file_slots));
+ break;
+ }
+ default:
+ st = Status::InternalError("Not supported file format in fetch table
schema: {}",
+ params.format_type);
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ std::vector<std::string> col_names;
+ std::vector<TypeDescriptor> col_types;
+ st = reader->get_parsed_schema(&col_names, &col_types);
+ if (!st.ok()) {
+ LOG(WARNING) << "fetch table schema failed, errmsg=" <<
st.get_error_msg();
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ result->set_column_nums(col_names.size());
+ for (size_t idx = 0; idx < col_names.size(); ++idx) {
+ result->add_column_names(col_names[idx]);
+ }
+ for (size_t idx = 0; idx < col_types.size(); ++idx) {
+ PTypeDesc* type_desc = result->add_column_types();
+ col_types[idx].to_protobuf(type_desc);
+ }
+ st.to_protobuf(result->mutable_status());
}
void PInternalServiceImpl::get_info(google::protobuf::RpcController*
controller,
const PProxyRequest* request,
PProxyResult* response,
google::protobuf::Closure* done) {
- bool ret = _heavy_work_pool.offer([this, request, response, done]() {
- brpc::ClosureGuard closure_guard(done);
- // PProxyRequest is defined in gensrc/proto/internal_service.proto
- // Currently it supports 2 kinds of requests:
- // 1. get all kafka partition ids for given topic
- // 2. get all kafka partition offsets for given topic and timestamp.
- if (request->has_kafka_meta_request()) {
- const PKafkaMetaProxyRequest& kafka_request =
request->kafka_meta_request();
- if (!kafka_request.partition_id_for_latest_offsets().empty()) {
- // get latest offsets for specified partition ids
- std::vector<PIntegerPair> partition_offsets;
- Status st = _exec_env->routine_load_task_executor()
- ->get_kafka_latest_offsets_for_partitions(
- request->kafka_meta_request(),
&partition_offsets);
- if (st.ok()) {
- PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
- for (const auto& entry : partition_offsets) {
- PIntegerPair* res = part_offsets->add_offset_times();
- res->set_key(entry.key());
- res->set_val(entry.val());
- }
+ brpc::ClosureGuard closure_guard(done);
+ // PProxyRequest is defined in gensrc/proto/internal_service.proto
+ // Currently it supports 2 kinds of requests:
+ // 1. get all kafka partition ids for given topic
+ // 2. get all kafka partition offsets for given topic and timestamp.
+ if (request->has_kafka_meta_request()) {
+ const PKafkaMetaProxyRequest& kafka_request =
request->kafka_meta_request();
+ if (!kafka_request.partition_id_for_latest_offsets().empty()) {
+ // get latest offsets for specified partition ids
+ std::vector<PIntegerPair> partition_offsets;
+ Status st = _exec_env->routine_load_task_executor()
+ ->get_kafka_latest_offsets_for_partitions(
+ request->kafka_meta_request(),
&partition_offsets);
+ if (st.ok()) {
+ PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
+ for (const auto& entry : partition_offsets) {
+ PIntegerPair* res = part_offsets->add_offset_times();
+ res->set_key(entry.key());
+ res->set_val(entry.val());
}
- st.to_protobuf(response->mutable_status());
- return;
- } else if (!kafka_request.offset_times().empty()) {
- // if offset_times() has elements, which means this request is
to get offset by timestamp.
- std::vector<PIntegerPair> partition_offsets;
- Status st = _exec_env->routine_load_task_executor()
- ->get_kafka_partition_offsets_for_times(
- request->kafka_meta_request(),
&partition_offsets);
- if (st.ok()) {
- PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
- for (const auto& entry : partition_offsets) {
- PIntegerPair* res = part_offsets->add_offset_times();
- res->set_key(entry.key());
- res->set_val(entry.val());
- }
+ }
+ st.to_protobuf(response->mutable_status());
+ return;
+ } else if (!kafka_request.offset_times().empty()) {
+ // if offset_times() has elements, which means this request is to
get offset by timestamp.
+ std::vector<PIntegerPair> partition_offsets;
+ Status st =
+
_exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times(
+ request->kafka_meta_request(), &partition_offsets);
+ if (st.ok()) {
+ PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
+ for (const auto& entry : partition_offsets) {
+ PIntegerPair* res = part_offsets->add_offset_times();
+ res->set_key(entry.key());
+ res->set_val(entry.val());
}
- st.to_protobuf(response->mutable_status());
- return;
- } else {
- // get partition ids of topic
- std::vector<int32_t> partition_ids;
- Status st =
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
- request->kafka_meta_request(), &partition_ids);
- if (st.ok()) {
- PKafkaMetaProxyResult* kafka_result =
response->mutable_kafka_meta_result();
- for (int32_t id : partition_ids) {
- kafka_result->add_partition_ids(id);
- }
+ }
+ st.to_protobuf(response->mutable_status());
+ return;
+ } else {
+ // get partition ids of topic
+ std::vector<int32_t> partition_ids;
+ Status st =
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
+ request->kafka_meta_request(), &partition_ids);
+ if (st.ok()) {
+ PKafkaMetaProxyResult* kafka_result =
response->mutable_kafka_meta_result();
+ for (int32_t id : partition_ids) {
+ kafka_result->add_partition_ids(id);
}
- st.to_protobuf(response->mutable_status());
- return;
}
+ st.to_protobuf(response->mutable_status());
+ return;
}
- Status::OK().to_protobuf(response->mutable_status());
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
+ Status::OK().to_protobuf(response->mutable_status());
}
void PInternalServiceImpl::update_cache(google::protobuf::RpcController*
controller,
const PUpdateCacheRequest* request,
PCacheResponse* response,
google::protobuf::Closure* done) {
- bool ret = _light_work_pool.offer([this, request, response, done]() {
- brpc::ClosureGuard closure_guard(done);
- _exec_env->result_cache()->update(request, response);
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->set_status(PCacheStatus::CANCELED);
- }
+ brpc::ClosureGuard closure_guard(done);
+ _exec_env->result_cache()->update(request, response);
}
void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController*
controller,
const PFetchCacheRequest* request,
PFetchCacheResult* result,
google::protobuf::Closure* done) {
- bool ret = _heavy_work_pool.offer([this, request, result, done]() {
- brpc::ClosureGuard closure_guard(done);
- _exec_env->result_cache()->fetch(request, result);
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- result->set_status(PCacheStatus::CANCELED);
- }
+ brpc::ClosureGuard closure_guard(done);
+ _exec_env->result_cache()->fetch(request, result);
}
void PInternalServiceImpl::clear_cache(google::protobuf::RpcController*
controller,
const PClearCacheRequest* request,
PCacheResponse* response,
google::protobuf::Closure* done) {
- bool ret = _light_work_pool.offer([this, request, response, done]() {
- brpc::ClosureGuard closure_guard(done);
- _exec_env->result_cache()->clear(request, response);
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->set_status(PCacheStatus::CANCELED);
- }
+ brpc::ClosureGuard closure_guard(done);
+ _exec_env->result_cache()->clear(request, response);
}
void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController*
controller,
const ::doris::PMergeFilterRequest*
request,
::doris::PMergeFilterResponse*
response,
::google::protobuf::Closure* done) {
- bool ret = _light_work_pool.offer([this, controller, request, response,
done]() {
- brpc::ClosureGuard closure_guard(done);
- auto attachment =
static_cast<brpc::Controller*>(controller)->request_attachment();
- butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
- Status st = _exec_env->fragment_mgr()->merge_filter(request,
&zero_copy_input_stream);
- if (!st.ok()) {
- LOG(WARNING) << "merge meet error" << st.to_string();
- }
- st.to_protobuf(response->mutable_status());
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ brpc::ClosureGuard closure_guard(done);
+ auto attachment =
static_cast<brpc::Controller*>(controller)->request_attachment();
+ butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
+ Status st = _exec_env->fragment_mgr()->merge_filter(request,
&zero_copy_input_stream);
+ if (!st.ok()) {
+ LOG(WARNING) << "merge meet error" << st.to_string();
}
+ st.to_protobuf(response->mutable_status());
}
void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController*
controller,
const ::doris::PPublishFilterRequest*
request,
::doris::PPublishFilterResponse*
response,
::google::protobuf::Closure* done) {
- bool ret = _light_work_pool.offer([this, controller, request, response,
done]() {
- brpc::ClosureGuard closure_guard(done);
- auto attachment =
static_cast<brpc::Controller*>(controller)->request_attachment();
- butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
- UniqueId unique_id(request->query_id());
- VLOG_NOTICE << "rpc apply_filter recv";
- Status st = _exec_env->fragment_mgr()->apply_filter(request,
&zero_copy_input_stream);
- if (!st.ok()) {
- LOG(WARNING) << "apply filter meet error: " << st.to_string();
- }
- st.to_protobuf(response->mutable_status());
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ brpc::ClosureGuard closure_guard(done);
+ auto attachment =
static_cast<brpc::Controller*>(controller)->request_attachment();
+ butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
+ UniqueId unique_id(request->query_id());
+ VLOG_NOTICE << "rpc apply_filter recv";
+ Status st = _exec_env->fragment_mgr()->apply_filter(request,
&zero_copy_input_stream);
+ if (!st.ok()) {
+ LOG(WARNING) << "apply filter meet error: " << st.to_string();
}
+ st.to_protobuf(response->mutable_status());
}
void PInternalServiceImpl::send_data(google::protobuf::RpcController*
controller,
const PSendDataRequest* request,
PSendDataResult* response,
google::protobuf::Closure* done) {
- bool ret = _heavy_work_pool.offer([this, request, response, done]() {
- brpc::ClosureGuard closure_guard(done);
- TUniqueId fragment_instance_id;
- fragment_instance_id.hi = request->fragment_instance_id().hi();
- fragment_instance_id.lo = request->fragment_instance_id().lo();
-
- auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
- if (pipe == nullptr) {
- response->mutable_status()->set_status_code(1);
- response->mutable_status()->add_error_msgs("pipe is null");
- } else {
- for (int i = 0; i < request->data_size(); ++i) {
- PDataRow* row = new PDataRow();
- row->CopyFrom(request->data(i));
- pipe->append_and_flush(reinterpret_cast<char*>(&row),
sizeof(row),
- sizeof(row) + row->ByteSizeLong());
- }
- response->mutable_status()->set_status_code(0);
+ brpc::ClosureGuard closure_guard(done);
+ TUniqueId fragment_instance_id;
+ fragment_instance_id.hi = request->fragment_instance_id().hi();
+ fragment_instance_id.lo = request->fragment_instance_id().lo();
+ auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+ if (pipe == nullptr) {
+ response->mutable_status()->set_status_code(1);
+ response->mutable_status()->add_error_msgs("pipe is null");
+ } else {
+ for (int i = 0; i < request->data_size(); ++i) {
+ PDataRow* row = new PDataRow();
+ row->CopyFrom(request->data(i));
+ pipe->append_and_flush(reinterpret_cast<char*>(&row), sizeof(row),
+ sizeof(row) + row->ByteSizeLong());
}
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ response->mutable_status()->set_status_code(0);
}
}
void PInternalServiceImpl::commit(google::protobuf::RpcController* controller,
const PCommitRequest* request,
PCommitResult* response,
google::protobuf::Closure* done) {
- bool ret = _light_work_pool.offer([this, request, response, done]() {
- brpc::ClosureGuard closure_guard(done);
- TUniqueId fragment_instance_id;
- fragment_instance_id.hi = request->fragment_instance_id().hi();
- fragment_instance_id.lo = request->fragment_instance_id().lo();
-
- auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
- if (pipe == nullptr) {
- response->mutable_status()->set_status_code(1);
- response->mutable_status()->add_error_msgs("pipe is null");
- } else {
- pipe->finish();
- response->mutable_status()->set_status_code(0);
- }
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ brpc::ClosureGuard closure_guard(done);
+ TUniqueId fragment_instance_id;
+ fragment_instance_id.hi = request->fragment_instance_id().hi();
+ fragment_instance_id.lo = request->fragment_instance_id().lo();
+ auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+ if (pipe == nullptr) {
+ response->mutable_status()->set_status_code(1);
+ response->mutable_status()->add_error_msgs("pipe is null");
+ } else {
+ pipe->finish();
+ response->mutable_status()->set_status_code(0);
}
}
void PInternalServiceImpl::rollback(google::protobuf::RpcController*
controller,
const PRollbackRequest* request,
PRollbackResult* response,
google::protobuf::Closure* done) {
- bool ret = _light_work_pool.offer([this, request, response, done]() {
- brpc::ClosureGuard closure_guard(done);
- TUniqueId fragment_instance_id;
- fragment_instance_id.hi = request->fragment_instance_id().hi();
- fragment_instance_id.lo = request->fragment_instance_id().lo();
-
- auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
- if (pipe == nullptr) {
- response->mutable_status()->set_status_code(1);
- response->mutable_status()->add_error_msgs("pipe is null");
- } else {
- pipe->cancel("rollback");
- response->mutable_status()->set_status_code(0);
- }
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ brpc::ClosureGuard closure_guard(done);
+ TUniqueId fragment_instance_id;
+ fragment_instance_id.hi = request->fragment_instance_id().hi();
+ fragment_instance_id.lo = request->fragment_instance_id().lo();
+ auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+ if (pipe == nullptr) {
+ response->mutable_status()->set_status_code(1);
+ response->mutable_status()->add_error_msgs("pipe is null");
+ } else {
+ pipe->cancel("rollback");
+ response->mutable_status()->set_status_code(0);
}
}
-void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController*
controller,
+void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController*
cntl_base,
const PConstantExprRequest*
request,
PConstantExprResult* response,
google::protobuf::Closure* done)
{
- bool ret = _light_work_pool.offer([this, request, response, done]() {
- brpc::ClosureGuard closure_guard(done);
- Status st = Status::OK();
+ brpc::ClosureGuard closure_guard(done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+
+ Status st = Status::OK();
+ if (request->has_request()) {
st = _fold_constant_expr(request->request(), response);
- if (!st.ok()) {
- LOG(WARNING) << "exec fold constant expr failed, errmsg=" <<
st.get_error_msg();
- }
- st.to_protobuf(response->mutable_status());
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ } else {
+ // TODO(yangzhengguo) this is just for compatible with old version,
this should be removed in the release 0.15
+ st = _fold_constant_expr(cntl->request_attachment().to_string(),
response);
+ }
+ if (!st.ok()) {
+ LOG(WARNING) << "exec fold constant expr failed, errmsg=" <<
st.get_error_msg();
}
+ st.to_protobuf(response->mutable_status());
}
Status PInternalServiceImpl::_fold_constant_expr(const std::string&
ser_request,
@@ -879,48 +696,31 @@ Status PInternalServiceImpl::_fold_constant_expr(const
std::string& ser_request,
return FoldConstantExecutor().fold_constant_vexpr(t_request, response);
}
-void PInternalServiceImpl::transmit_block(google::protobuf::RpcController*
controller,
+void PInternalServiceImpl::transmit_block(google::protobuf::RpcController*
cntl_base,
const PTransmitDataParams* request,
PTransmitDataResult* response,
google::protobuf::Closure* done) {
- bool ret = _heavy_work_pool.offer([this, controller, request, response,
done]() {
- // TODO(zxy) delete in 1.2 version
- google::protobuf::Closure* new_done = new
NewHttpClosure<PTransmitDataParams>(done);
- brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
- attachment_transfer_request_block<PTransmitDataParams>(request, cntl);
+ // TODO(zxy) delete in 1.2 version
+ google::protobuf::Closure* new_done = new
NewHttpClosure<PTransmitDataParams>(done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+ attachment_transfer_request_block<PTransmitDataParams>(request, cntl);
- _transmit_block(controller, request, response, new_done, Status::OK());
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
- }
+ _transmit_block(cntl_base, request, response, new_done, Status::OK());
}
-void
PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController*
controller,
+void
PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController*
cntl_base,
const PEmptyRequest* request,
PTransmitDataResult*
response,
google::protobuf::Closure*
done) {
- bool ret = _heavy_work_pool.offer([this, controller, response, done]() {
- PTransmitDataParams* new_request = new PTransmitDataParams();
- google::protobuf::Closure* new_done =
- new NewHttpClosure<PTransmitDataParams>(new_request, done);
- brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
- Status st =
-
attachment_extract_request_contain_block<PTransmitDataParams>(new_request,
cntl);
- _transmit_block(controller, new_request, response, new_done, st);
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
- }
+ PTransmitDataParams* new_request = new PTransmitDataParams();
+ google::protobuf::Closure* new_done =
+ new NewHttpClosure<PTransmitDataParams>(new_request, done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+ Status st =
attachment_extract_request_contain_block<PTransmitDataParams>(new_request,
cntl);
+ _transmit_block(cntl_base, new_request, response, new_done, st);
}
-void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController*
controller,
+void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController*
cntl_base,
const PTransmitDataParams* request,
PTransmitDataResult* response,
google::protobuf::Closure* done,
@@ -958,34 +758,25 @@ void
PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* co
const PCheckRPCChannelRequest*
request,
PCheckRPCChannelResponse*
response,
google::protobuf::Closure* done) {
- bool ret = _light_work_pool.offer([request, response, done]() {
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(0);
- if (request->data().size() != request->size()) {
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(0);
+ if (request->data().size() != request->size()) {
+ std::stringstream ss;
+ ss << "data size not same, expected: " << request->size()
+ << ", actual: " << request->data().size();
+ response->mutable_status()->add_error_msgs(ss.str());
+ response->mutable_status()->set_status_code(1);
+
+ } else {
+ Md5Digest digest;
+ digest.update(static_cast<const void*>(request->data().c_str()),
request->data().size());
+ digest.digest();
+ if (!iequal(digest.hex(), request->md5())) {
std::stringstream ss;
- ss << "data size not same, expected: " << request->size()
- << ", actual: " << request->data().size();
+ ss << "md5 not same, expected: " << request->md5() << ", actual: "
<< digest.hex();
response->mutable_status()->add_error_msgs(ss.str());
response->mutable_status()->set_status_code(1);
-
- } else {
- Md5Digest digest;
- digest.update(static_cast<const void*>(request->data().c_str()),
- request->data().size());
- digest.digest();
- if (!iequal(digest.hex(), request->md5())) {
- std::stringstream ss;
- ss << "md5 not same, expected: " << request->md5() << ",
actual: " << digest.hex();
- response->mutable_status()->add_error_msgs(ss.str());
- response->mutable_status()->set_status_code(1);
- }
}
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
}
@@ -993,60 +784,44 @@ void
PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* co
const PResetRPCChannelRequest*
request,
PResetRPCChannelResponse*
response,
google::protobuf::Closure* done) {
- bool ret = _light_work_pool.offer([request, response, done]() {
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(0);
- if (request->all()) {
- int size =
ExecEnv::GetInstance()->brpc_internal_client_cache()->size();
- if (size > 0) {
- std::vector<std::string> endpoints;
-
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints);
- ExecEnv::GetInstance()->brpc_internal_client_cache()->clear();
- *response->mutable_channels() = {endpoints.begin(),
endpoints.end()};
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(0);
+ if (request->all()) {
+ int size =
ExecEnv::GetInstance()->brpc_internal_client_cache()->size();
+ if (size > 0) {
+ std::vector<std::string> endpoints;
+
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints);
+ ExecEnv::GetInstance()->brpc_internal_client_cache()->clear();
+ *response->mutable_channels() = {endpoints.begin(),
endpoints.end()};
+ }
+ } else {
+ for (const std::string& endpoint : request->endpoints()) {
+ if
(!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) {
+ response->mutable_status()->add_error_msgs(endpoint + ": not
found.");
+ continue;
}
- } else {
- for (const std::string& endpoint : request->endpoints()) {
- if
(!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) {
- response->mutable_status()->add_error_msgs(endpoint + ":
not found.");
- continue;
- }
- if
(ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) {
- response->add_channels(endpoint);
- } else {
- response->mutable_status()->add_error_msgs(endpoint + ":
reset failed.");
- }
- }
- if (request->endpoints_size() != response->channels_size()) {
- response->mutable_status()->set_status_code(1);
+ if
(ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) {
+ response->add_channels(endpoint);
+ } else {
+ response->mutable_status()->add_error_msgs(endpoint + ": reset
failed.");
}
}
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ if (request->endpoints_size() != response->channels_size()) {
+ response->mutable_status()->set_status_code(1);
+ }
}
}
-void PInternalServiceImpl::hand_shake(google::protobuf::RpcController*
controller,
+void PInternalServiceImpl::hand_shake(google::protobuf::RpcController*
cntl_base,
const PHandShakeRequest* request,
PHandShakeResponse* response,
google::protobuf::Closure* done) {
- bool ret = _light_work_pool.offer([request, response, done]() {
- brpc::ClosureGuard closure_guard(done);
- if (request->has_hello()) {
- response->set_hello(request->hello());
- }
- response->mutable_status()->set_status_code(0);
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ brpc::ClosureGuard closure_guard(done);
+ if (request->has_hello()) {
+ response->set_hello(request->hello());
}
+ response->mutable_status()->set_status_code(0);
}
void PInternalServiceImpl::request_slave_tablet_pull_rowset(
@@ -1061,8 +836,7 @@ void
PInternalServiceImpl::request_slave_tablet_pull_rowset(
int64_t brpc_port = request->brpc_port();
std::string token = request->token();
int64_t node_id = request->node_id();
- bool ret = _heavy_work_pool.offer([rowset_meta_pb, host, brpc_port,
node_id, segments_size,
- http_port, token, rowset_path, this]() {
+ _slave_replica_worker_pool.offer([=]() {
TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(
rowset_meta_pb.tablet_id(),
rowset_meta_pb.tablet_schema_hash());
if (tablet == nullptr) {
@@ -1205,12 +979,6 @@ void
PInternalServiceImpl::request_slave_tablet_pull_rowset(
_response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
rowset_meta->tablet_id(), node_id, true);
});
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
- }
Status::OK().to_protobuf(response->mutable_status());
}
@@ -1269,22 +1037,14 @@ void
PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote
void PInternalServiceImpl::response_slave_tablet_pull_rowset(
google::protobuf::RpcController* controller, const
PTabletWriteSlaveDoneRequest* request,
PTabletWriteSlaveDoneResult* response, google::protobuf::Closure*
done) {
- bool ret = _heavy_work_pool.offer([request, response, done]() {
- brpc::ClosureGuard closure_guard(done);
- VLOG_CRITICAL << "receive the result of slave replica pull rowset from
slave replica. "
- "slave server="
- << request->node_id() << ", is_succeed=" <<
request->is_succeed()
- << ", tablet_id=" << request->tablet_id() << ", txn_id="
<< request->txn_id();
-
StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset(
- request->txn_id(), request->tablet_id(), request->node_id(),
request->is_succeed());
- Status::OK().to_protobuf(response->mutable_status());
- });
- if (!ret) {
- LOG(WARNING) << "fail to offer request to the work pool";
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
- }
+ brpc::ClosureGuard closure_guard(done);
+ VLOG_CRITICAL
+ << "receive the result of slave replica pull rowset from slave
replica. slave server="
+ << request->node_id() << ", is_succeed=" << request->is_succeed()
+ << ", tablet_id=" << request->tablet_id() << ", txn_id=" <<
request->txn_id();
+ StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset(
+ request->txn_id(), request->tablet_id(), request->node_id(),
request->is_succeed());
+ Status::OK().to_protobuf(response->mutable_status());
}
} // namespace doris
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index e5855d98f3..3ea3655974 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -194,13 +194,8 @@ private:
private:
ExecEnv* _exec_env;
-
- // every brpc service request should put into thread pool
- // the reason see issue #16634
- // define the interface for reading and writing data as heavy interface
- // otherwise as light interface
- PriorityThreadPool _heavy_work_pool;
- PriorityThreadPool _light_work_pool;
+ PriorityThreadPool _tablet_worker_pool;
+ PriorityThreadPool _slave_replica_worker_pool;
};
} // namespace doris
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 648414f3e1..da02613000 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -212,16 +212,6 @@ public:
IntCounter* upload_rowset_count;
IntCounter* upload_fail_count;
- UIntGauge* light_work_pool_queue_size;
- UIntGauge* heavy_work_pool_queue_size;
- UIntGauge* heavy_work_active_threads;
- UIntGauge* light_work_active_threads;
-
- UIntGauge* heavy_work_pool_max_queue_size;
- UIntGauge* light_work_pool_max_queue_size;
- UIntGauge* heavy_work_max_threads;
- UIntGauge* light_work_max_threads;
-
static DorisMetrics* instance() {
static DorisMetrics instance;
return &instance;
diff --git a/be/src/util/priority_thread_pool.hpp
b/be/src/util/priority_thread_pool.hpp
index 077b9757c8..32fc637970 100644
--- a/be/src/util/priority_thread_pool.hpp
+++ b/be/src/util/priority_thread_pool.hpp
@@ -54,7 +54,7 @@ public:
// queue exceeds this size, subsequent calls to Offer will block until
there is
// capacity available.
PriorityThreadPool(uint32_t num_threads, uint32_t queue_size, const
std::string& name)
- : _work_queue(queue_size), _shutdown(false), _name(name),
_active_threads(0) {
+ : _work_queue(queue_size), _shutdown(false), _name(name) {
for (int i = 0; i < num_threads; ++i) {
_threads.create_thread(
std::bind<void>(std::mem_fn(&PriorityThreadPool::work_thread), this, i));
@@ -100,7 +100,6 @@ public:
virtual void join() { _threads.join_all(); }
virtual uint32_t get_queue_size() const { return _work_queue.get_size(); }
- virtual uint32_t get_active_threads() const { return _active_threads; }
// Blocks until the work queue is empty, and then calls shutdown to stop
the worker
// threads and Join to wait until they are finished.
@@ -136,9 +135,7 @@ private:
while (!is_shutdown()) {
Task task;
if (_work_queue.blocking_get(&task)) {
- _active_threads++;
task.work_function();
- _active_threads--;
}
if (_work_queue.get_size() == 0) {
_empty_cv.notify_all();
@@ -153,7 +150,6 @@ private:
// Set to true when threads should stop doing work and terminate.
std::atomic<bool> _shutdown;
std::string _name;
- std::atomic<int> _active_threads;
};
} // namespace doris
diff --git
a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
index 464c59c309..610125abe3 100644
--- a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
+++ b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
@@ -295,12 +295,6 @@ curl http://be_host:webserver_port/metrics?type=json
|`fragment_thread_pool_queue_size`| | Num | 当前查询执行线程池等待队列的长度 |
如果大于零,则说明查询线程已耗尽,查询会出现堆积 | P0 |
|`doris_be_all_rowsets_num`| | Num | 当前所有 rowset 的个数 | | P0 |
|`doris_be_all_segments_num`| | Num | 当前所有 segment 的个数 | | P0 |
-|`doris_be_heavy_work_max_threads`| | Num | brpc heavy线程池线程个数| | p0 |
-|`doris_be_light_work_max_threads`| | Num | brpc light线程池线程个数| | p0 |
-|`doris_be_heavy_work_pool_queue_size`| | Num | brpc
heavy线程池队列最大长度,超过则阻塞提交work| | p0 |
-|`doris_be_light_work_pool_queue_size`| | Num | brpc
light线程池队列最大长度,超过则阻塞提交work| | p0 |
-|`doris_be_heavy_work_active_threads`| | Num | brpc heavy线程池活跃线程数| | p0 |
-|`doris_be_light_work_active_threads`| | Num | brpc light线程池活跃线程数| | p0 |
### 机器监控
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index be17516c64..0396e8944b 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -243,7 +243,6 @@ enum PCacheStatus {
INVALID_KEY_RANGE = 6;
DATA_OVERDUE = 7;
EMPTY_DATA = 8;
- CANCELED = 9;
};
enum CacheType {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]