This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e33f4f90ae [fix](exec) Avoid query thread block on wait_for_start
(#12411)
e33f4f90ae is described below
commit e33f4f90aed728c343757fcba3cc99aa4459185e
Author: Mingyu Chen <[email protected]>
AuthorDate: Tue Sep 13 08:57:37 2022 +0800
[fix](exec) Avoid query thread block on wait_for_start (#12411)
When FE send cancel rpc to BE, it does not notify the wait_for_start()
thread, so that the fragment will be blocked and occupy the execution thread.
Add a max wait time for wait_for_start() thread. So that it will not block
forever.
---
be/src/common/config.h | 8 +++-
be/src/runtime/fragment_mgr.cpp | 48 ++++++++++------------
be/src/runtime/plan_fragment_executor.cpp | 8 ++--
be/src/runtime/plan_fragment_executor.h | 5 ---
be/src/runtime/query_fragments_ctx.h | 13 ++++--
be/src/util/doris_metrics.h | 1 +
be/src/vec/exec/volap_scan_node.cpp | 5 +--
be/test/runtime/fragment_mgr_test.cpp | 8 ----
docs/sidebars.json | 1 -
.../maint-monitor/monitor-metrics/metrics.md | 1 +
10 files changed, 46 insertions(+), 52 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index a099de3c4f..cf24e4de8b 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -859,6 +859,13 @@ CONF_Bool(enable_new_scan_node, "true");
// limit the queue of pending batches which will be sent by a single
nodechannel
CONF_mInt64(nodechannel_pending_queue_max_bytes, "67108864");
+// Max waiting time to wait the "plan fragment start" rpc.
+// If timeout, the fragment will be cancelled.
+// This parameter is usually only used when the FE loses connection,
+// and the BE can automatically cancel the relevant fragment after the timeout,
+// so as to avoid occupying the execution thread for a long time.
+CONF_mInt32(max_fragment_start_wait_time_seconds, "30");
+
#ifdef BE_TEST
// test s3
CONF_String(test_s3_resource, "resource");
@@ -869,7 +876,6 @@ CONF_String(test_s3_region, "region");
CONF_String(test_s3_bucket, "bucket");
CONF_String(test_s3_prefix, "prefix");
#endif
-
} // namespace config
} // namespace doris
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index c5b3cbf818..1b86f57a94 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -60,6 +60,7 @@ namespace doris {
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(plan_fragment_count, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size,
MetricUnit::NOUNIT);
std::string to_load_error_http_path(const std::string& file_name) {
if (file_name.empty()) {
@@ -95,8 +96,6 @@ public:
Status execute();
- Status cancel_before_execute();
-
Status cancel(const PPlanFragmentCancelReason& reason, const std::string&
msg = "");
TUniqueId fragment_instance_id() const { return _fragment_instance_id; }
@@ -236,13 +235,20 @@ Status FragmentExecState::execute() {
if (_need_wait_execution_trigger) {
// if _need_wait_execution_trigger is true, which means this instance
// is prepared but need to wait for the signal to do the rest
execution.
- _fragments_ctx->wait_for_start();
- opentelemetry::trace::Tracer::GetCurrentSpan()->AddEvent("start
executing Fragment");
+ if (!_fragments_ctx->wait_for_start()) {
+ return cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "wait
fragment start timeout");
+ }
+ }
+#ifndef BE_TEST
+ if (_executor.runtime_state()->is_cancelled()) {
+ return Status::Cancelled("cancelled before execution");
}
+#endif
int64_t duration_ns = 0;
{
SCOPED_RAW_TIMER(&duration_ns);
CgroupsMgr::apply_system_cgroup();
+ opentelemetry::trace::Tracer::GetCurrentSpan()->AddEvent("start
executing Fragment");
WARN_IF_ERROR(_executor.open(), strings::Substitute("Got error while
opening fragment $0",
print_id(_fragment_instance_id)));
@@ -253,24 +259,9 @@ Status FragmentExecState::execute() {
return Status::OK();
}
-Status FragmentExecState::cancel_before_execute() {
- // set status as 'abort', cuz cancel() won't effect the status arg of
DataSink::close().
-#ifndef BE_TEST
- SCOPED_ATTACH_TASK(executor()->runtime_state());
-#endif
- _executor.set_abort();
- _executor.cancel();
- if (_pipe != nullptr) {
- _pipe->cancel("Execution aborted before start");
- }
- return Status::OK();
-}
-
Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason,
const std::string& msg) {
if (!_cancelled) {
- _cancelled = true;
std::lock_guard<std::mutex> l(_status_lock);
- RETURN_IF_ERROR(_exec_status);
if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
_executor.set_is_report_on_cancel(false);
}
@@ -278,6 +269,7 @@ Status FragmentExecState::cancel(const
PPlanFragmentCancelReason& reason, const
if (_pipe != nullptr) {
_pipe->cancel(PPlanFragmentCancelReason_Name(reason));
}
+ _cancelled = true;
}
return Status::OK();
}
@@ -452,11 +444,15 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env)
.set_max_threads(config::fragment_pool_thread_num_max)
.set_max_queue_size(config::fragment_pool_queue_size)
.build(&_thread_pool);
+
+ REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size,
+ [this]() { return _thread_pool->get_queue_size(); });
CHECK(s.ok()) << s.to_string();
}
FragmentMgr::~FragmentMgr() {
DEREGISTER_HOOK_METRIC(plan_fragment_count);
+ DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size);
_stop_background_threads_latch.count_down();
if (_cancel_thread) {
_cancel_thread->join();
@@ -565,7 +561,7 @@ Status FragmentMgr::start_query_execution(const
PExecPlanFragmentStartRequest* r
"timeout or be cancelled. host: {}",
BackendOptions::get_localhost());
}
- search->second->set_ready_to_execute();
+ search->second->set_ready_to_execute(false);
return Status::OK();
}
@@ -712,9 +708,12 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params, Fi
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.erase(params.params.fragment_instance_id);
}
- exec_state->cancel_before_execute();
- return Status::InternalError("Put planfragment to thread pool failed.
err = {}, BE: {}",
- st.get_error_msg(),
BackendOptions::get_localhost());
+ exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
+ "push plan fragment to thread pool failed");
+ return Status::InternalError(
+ strings::Substitute("push plan fragment $0 to thread pool
failed. err = $1, BE: $2",
+
print_id(params.params.fragment_instance_id),
+ st.get_error_msg(),
BackendOptions::get_localhost()));
}
return Status::OK();
@@ -761,9 +760,6 @@ void FragmentMgr::cancel_worker() {
}
for (auto it = _fragments_ctx_map.begin(); it !=
_fragments_ctx_map.end();) {
if (it->second->is_timeout(now)) {
- // The execution logic of the instance needs to be
notified.
- // The execution logic of the instance will eventually
cancel the execution plan.
- it->second->set_ready_to_execute();
it = _fragments_ctx_map.erase(it);
} else {
++it;
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index aab0b44379..ae5ee8f75a 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -628,6 +628,8 @@ void PlanFragmentExecutor::cancel(const
PPlanFragmentCancelReason& reason, const
_cancel_reason = reason;
_cancel_msg = msg;
_runtime_state->set_is_cancelled(true);
+ // To notify wait_for_start()
+ _runtime_state->get_query_fragments_ctx()->set_ready_to_execute(true);
// must close stream_mgr to avoid dead lock in Exchange Node
auto env = _runtime_state->exec_env();
@@ -638,10 +640,8 @@ void PlanFragmentExecutor::cancel(const
PPlanFragmentCancelReason& reason, const
env->stream_mgr()->cancel(id);
env->result_mgr()->cancel(id);
}
-}
-
-void PlanFragmentExecutor::set_abort() {
- update_status(Status::Aborted("Execution aborted before start"));
+ // Cancel the result queue manager used by spark doris connector
+ _exec_env->result_queue_mgr()->update_queue_status(id,
Status::Aborted(msg));
}
const RowDescriptor& PlanFragmentExecutor::row_desc() {
diff --git a/be/src/runtime/plan_fragment_executor.h
b/be/src/runtime/plan_fragment_executor.h
index d2558deedb..215ff0973b 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -123,11 +123,6 @@ public:
// in open()/get_next().
void close();
- // Abort this execution. Must be called if we skip running open().
- // It will let DataSink node closed with error status, to avoid use
resources which created in open() phase.
- // DataSink node should distinguish Aborted status from other error status.
- void set_abort();
-
// Initiate cancellation. Must not be called until after prepare()
returned.
void cancel(const PPlanFragmentCancelReason& reason =
PPlanFragmentCancelReason::INTERNAL_ERROR,
const std::string& msg = "");
diff --git a/be/src/runtime/query_fragments_ctx.h
b/be/src/runtime/query_fragments_ctx.h
index 80a07f47e4..8f9ceb38d6 100644
--- a/be/src/runtime/query_fragments_ctx.h
+++ b/be/src/runtime/query_fragments_ctx.h
@@ -20,6 +20,7 @@
#include <atomic>
#include <string>
+#include "common/config.h"
#include "common/object_pool.h"
#include "gen_cpp/PaloInternalService_types.h" // for TQueryOptions
#include "gen_cpp/Types_types.h" // for TUniqueId
@@ -61,19 +62,22 @@ public:
ThreadPoolToken* get_token() { return _thread_token.get(); }
- void set_ready_to_execute() {
+ void set_ready_to_execute(bool is_cancelled) {
{
std::lock_guard<std::mutex> l(_start_lock);
+ _is_cancelled = is_cancelled;
_ready_to_execute = true;
}
_start_cond.notify_all();
}
- void wait_for_start() {
+ bool wait_for_start() {
+ int wait_time = config::max_fragment_start_wait_time_seconds;
std::unique_lock<std::mutex> l(_start_lock);
- while (!_ready_to_execute.load()) {
- _start_cond.wait(l);
+ while (!_ready_to_execute.load() && !_is_cancelled.load() &&
--wait_time > 0) {
+ _start_cond.wait_for(l, std::chrono::seconds(1));
}
+ return _ready_to_execute.load() && !_is_cancelled.load();
}
public:
@@ -112,6 +116,7 @@ private:
// Only valid when _need_wait_execution_trigger is set to true in
FragmentExecState.
// And all fragments of this query will start execution when this is set
to true.
std::atomic<bool> _ready_to_execute {false};
+ std::atomic<bool> _is_cancelled {false};
};
} // namespace doris
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 57585720c0..da9085671e 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -200,6 +200,7 @@ public:
UIntGauge* send_batch_thread_pool_queue_size;
UIntGauge* download_cache_thread_pool_thread_num;
UIntGauge* download_cache_thread_pool_queue_size;
+ UIntGauge* fragment_thread_pool_queue_size;
// Upload metrics
UIntGauge* upload_total_byte;
diff --git a/be/src/vec/exec/volap_scan_node.cpp
b/be/src/vec/exec/volap_scan_node.cpp
index 0f79b7426e..8197c88dbd 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -474,8 +474,8 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
num_rows_in_block < _runtime_state->batch_size())) {
if (UNLIKELY(_transfer_done)) {
eos = true;
- status = Status::Cancelled("Cancelled");
- LOG(INFO) << "Scan thread cancelled, cause query done, maybe reach
limit.";
+ status = Status::Cancelled(
+ "Scan thread cancelled, cause query done, maybe reach
limit.");
break;
}
@@ -1082,7 +1082,6 @@ Status VOlapScanNode::get_next(RuntimeState* state,
Block* block, bool* eos) {
_block_consumed_cv.notify_all();
*eos = true;
- LOG(INFO) << "VOlapScanNode ReachedLimit.";
} else {
*eos = false;
}
diff --git a/be/test/runtime/fragment_mgr_test.cpp
b/be/test/runtime/fragment_mgr_test.cpp
index bcf1b9f250..060b3a501a 100644
--- a/be/test/runtime/fragment_mgr_test.cpp
+++ b/be/test/runtime/fragment_mgr_test.cpp
@@ -28,7 +28,6 @@ namespace doris {
static Status s_prepare_status;
static Status s_open_status;
-static int s_abort_cnt;
// Mock used for this unittest
PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
const report_status_callback&
report_status_cb)
@@ -49,11 +48,6 @@ Status PlanFragmentExecutor::open() {
void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason,
const std::string& msg) {
}
-void PlanFragmentExecutor::set_abort() {
- LOG(INFO) << "Plan Aborted";
- s_abort_cnt++;
-}
-
void PlanFragmentExecutor::close() {}
class FragmentMgrTest : public testing::Test {
@@ -128,7 +122,6 @@ TEST_F(FragmentMgrTest, OfferPoolFailed) {
config::fragment_pool_thread_num_min = 1;
config::fragment_pool_thread_num_max = 1;
config::fragment_pool_queue_size = 0;
- s_abort_cnt = 0;
FragmentMgr mgr(nullptr);
TExecPlanFragmentParams params;
@@ -145,7 +138,6 @@ TEST_F(FragmentMgrTest, OfferPoolFailed) {
params.params.fragment_instance_id.__set_lo(200);
EXPECT_FALSE(mgr.exec_plan_fragment(params).ok());
}
- EXPECT_EQ(3, s_abort_cnt);
}
} // namespace doris
diff --git a/docs/sidebars.json b/docs/sidebars.json
index 2d8cdb36a0..1b1151e342 100644
--- a/docs/sidebars.json
+++ b/docs/sidebars.json
@@ -860,7 +860,6 @@
"admin-manual/maint-monitor/multi-tenant",
"admin-manual/maint-monitor/tablet-local-debug",
"admin-manual/maint-monitor/tablet-restore-tool",
- "admin-manual/maint-monitor/monitor-metrics/metrics",
"admin-manual/maint-monitor/metadata-operation"
]
},
diff --git
a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
index d5789c05f5..12996ef240 100644
--- a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
+++ b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
@@ -275,6 +275,7 @@ curl http://be_host:webserver_port/metrics?type=json
|`doris_be_upload_total_byte`| | | 字节 | 冷热分离功能,上传到远端存储成功的rowset数据量累计值| |
|`doris_be_load_bytes`| | 字节|通过 tablet sink 发送的数量累计 | 可观测导入数据量 | P0 |
|`doris_be_load_rows`| | Num | 通过 tablet sink 发送的行数累计| 可观测导入数据量 | P0 |
+|`fragment_thread_pool_queue_size`| | Num | 当前查询执行线程池等待队列的长度 |
如果大于零,则说明查询线程已耗尽,查询会出现堆积 | P0 |
### 机器监控
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]