This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7f52906a86c [minor](pipeline) catch exception if task queue is closed
(#53604)
7f52906a86c is described below
commit 7f52906a86cba0b47f43448a4bf728bef10d7bd0
Author: Gabriel <[email protected]>
AuthorDate: Tue Jul 22 09:22:11 2025 +0800
[minor](pipeline) catch exception if task queue is closed (#53604)
what(): [E6] WorkTaskQueue closed
0# doris::Exception::Exception(int, std::basic_string_view<char,
std::char_traits<char> > const&) at
/root/doris/be/src/common/exception.cpp:0
11:58:08
1# doris::Exception::Exception(doris::Status const&) at
/root/doris/be/src/common/exception.h:39
11:58:08
2# doris::pipeline::Dependency::set_ready() at
/root/doris/be/src/pipeline/dependency.cpp:88
11:58:08
3# doris::pipeline::Dependency::set_always_ready() at
/usr/local/ldb-toolchain-v0.25/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/unique_lock.h:113
11:58:08
4# doris::pipeline::PipelineTask::terminate() at
/usr/local/ldb-toolchain-v0.25/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/stl_iterator.h:1103
11:58:08
5# doris::pipeline::PipelineFragmentContext::cancel(doris::Status) at
/usr/local/ldb-toolchain-v0.25/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/stl_iterator.h:1103
11:58:08
6# doris::QueryContext::cancel_all_pipeline_context(doris::Status
const&, int) at /root/doris/be/src/runtime/query_context.cpp:0
11:58:08
7# doris::QueryContext::cancel(doris::Status, int) at
/root/doris/be/src/runtime/query_context.cpp:0
11:58:08
8#
doris::FragmentMgr::_check_brpc_available(std::shared_ptr<doris::PBackendService_Stub>
const&, doris::FragmentMgr::BrpcItem const&) at
/root/doris/be/src/runtime/fragment_mgr.cpp:0
11:58:08
9# doris::FragmentMgr::cancel_worker() at
/root/doris/be/src/runtime/fragment_mgr.cpp:0
11:58:08
10# doris::Thread::supervise_thread(void*) at
/usr/local/ldb-toolchain-v0.25/bin/../usr/include/pthread.h:563
11:58:08
---
be/src/pipeline/pipeline_task.cpp | 36 ++++++++++++++++++++----------------
1 file changed, 20 insertions(+), 16 deletions(-)
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index b4ea837b6a2..12c71b4daa4 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -325,22 +325,26 @@ void PipelineTask::terminate() {
std::unique_lock<std::mutex> lc(_dependency_lock);
auto fragment = _fragment_context.lock();
if (!is_finalized() && fragment) {
- DCHECK(_wake_up_early || fragment->is_canceled());
- std::for_each(_spill_dependencies.begin(), _spill_dependencies.end(),
- [&](Dependency* dep) { dep->set_always_ready(); });
- std::for_each(_write_dependencies.begin(), _write_dependencies.end(),
- [&](Dependency* dep) { dep->set_always_ready(); });
- std::for_each(_finish_dependencies.begin(), _finish_dependencies.end(),
- [&](Dependency* dep) { dep->set_always_ready(); });
- std::for_each(_read_dependencies.begin(), _read_dependencies.end(),
- [&](std::vector<Dependency*>& deps) {
- std::for_each(deps.begin(), deps.end(),
- [&](Dependency* dep) {
dep->set_always_ready(); });
- });
- // All `_execution_deps` will never be set blocking from ready. So we
just set ready here.
- std::for_each(_execution_dependencies.begin(),
_execution_dependencies.end(),
- [&](Dependency* dep) { dep->set_ready(); });
- _memory_sufficient_dependency->set_ready();
+ try {
+ DCHECK(_wake_up_early || fragment->is_canceled());
+ std::for_each(_spill_dependencies.begin(),
_spill_dependencies.end(),
+ [&](Dependency* dep) { dep->set_always_ready(); });
+ std::for_each(_write_dependencies.begin(),
_write_dependencies.end(),
+ [&](Dependency* dep) { dep->set_always_ready(); });
+ std::for_each(_finish_dependencies.begin(),
_finish_dependencies.end(),
+ [&](Dependency* dep) { dep->set_always_ready(); });
+ std::for_each(_read_dependencies.begin(), _read_dependencies.end(),
+ [&](std::vector<Dependency*>& deps) {
+ std::for_each(deps.begin(), deps.end(),
+ [&](Dependency* dep) {
dep->set_always_ready(); });
+ });
+ // All `_execution_deps` will never be set blocking from ready. So
we just set ready here.
+ std::for_each(_execution_dependencies.begin(),
_execution_dependencies.end(),
+ [&](Dependency* dep) { dep->set_ready(); });
+ _memory_sufficient_dependency->set_ready();
+ } catch (const doris::Exception& e) {
+ LOG(WARNING) << "Terminate failed: " << e.code() << ", " <<
e.to_string();
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]