This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f52906a86c [minor](pipeline) catch exception if task queue is closed 
(#53604)
7f52906a86c is described below

commit 7f52906a86cba0b47f43448a4bf728bef10d7bd0
Author: Gabriel <[email protected]>
AuthorDate: Tue Jul 22 09:22:11 2025 +0800

    [minor](pipeline) catch exception if task queue is closed (#53604)
    
    what(): [E6] WorkTaskQueue closed
    0# doris::Exception::Exception(int, std::basic_string_view<char,
    std::char_traits<char> > const&) at
    /root/doris/be/src/common/exception.cpp:0
    11:58:08
    1# doris::Exception::Exception(doris::Status const&) at
    /root/doris/be/src/common/exception.h:39
    11:58:08
    2# doris::pipeline::Dependency::set_ready() at
    /root/doris/be/src/pipeline/dependency.cpp:88
    11:58:08
    3# doris::pipeline::Dependency::set_always_ready() at
    
/usr/local/ldb-toolchain-v0.25/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/unique_lock.h:113
    11:58:08
    4# doris::pipeline::PipelineTask::terminate() at
    
/usr/local/ldb-toolchain-v0.25/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/stl_iterator.h:1103
    11:58:08
    5# doris::pipeline::PipelineFragmentContext::cancel(doris::Status) at
    
/usr/local/ldb-toolchain-v0.25/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/stl_iterator.h:1103
    11:58:08
    6# doris::QueryContext::cancel_all_pipeline_context(doris::Status
    const&, int) at /root/doris/be/src/runtime/query_context.cpp:0
    11:58:08
    7# doris::QueryContext::cancel(doris::Status, int) at
    /root/doris/be/src/runtime/query_context.cpp:0
    11:58:08
    8#
    
doris::FragmentMgr::_check_brpc_available(std::shared_ptr<doris::PBackendService_Stub>
    const&, doris::FragmentMgr::BrpcItem const&) at
    /root/doris/be/src/runtime/fragment_mgr.cpp:0
    11:58:08
    9# doris::FragmentMgr::cancel_worker() at
    /root/doris/be/src/runtime/fragment_mgr.cpp:0
    11:58:08
    10# doris::Thread::supervise_thread(void*) at
    /usr/local/ldb-toolchain-v0.25/bin/../usr/include/pthread.h:563
    11:58:08
---
 be/src/pipeline/pipeline_task.cpp | 36 ++++++++++++++++++++----------------
 1 file changed, 20 insertions(+), 16 deletions(-)

diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index b4ea837b6a2..12c71b4daa4 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -325,22 +325,26 @@ void PipelineTask::terminate() {
     std::unique_lock<std::mutex> lc(_dependency_lock);
     auto fragment = _fragment_context.lock();
     if (!is_finalized() && fragment) {
-        DCHECK(_wake_up_early || fragment->is_canceled());
-        std::for_each(_spill_dependencies.begin(), _spill_dependencies.end(),
-                      [&](Dependency* dep) { dep->set_always_ready(); });
-        std::for_each(_write_dependencies.begin(), _write_dependencies.end(),
-                      [&](Dependency* dep) { dep->set_always_ready(); });
-        std::for_each(_finish_dependencies.begin(), _finish_dependencies.end(),
-                      [&](Dependency* dep) { dep->set_always_ready(); });
-        std::for_each(_read_dependencies.begin(), _read_dependencies.end(),
-                      [&](std::vector<Dependency*>& deps) {
-                          std::for_each(deps.begin(), deps.end(),
-                                        [&](Dependency* dep) { 
dep->set_always_ready(); });
-                      });
-        // All `_execution_deps` will never be set blocking from ready. So we 
just set ready here.
-        std::for_each(_execution_dependencies.begin(), 
_execution_dependencies.end(),
-                      [&](Dependency* dep) { dep->set_ready(); });
-        _memory_sufficient_dependency->set_ready();
+        try {
+            DCHECK(_wake_up_early || fragment->is_canceled());
+            std::for_each(_spill_dependencies.begin(), 
_spill_dependencies.end(),
+                          [&](Dependency* dep) { dep->set_always_ready(); });
+            std::for_each(_write_dependencies.begin(), 
_write_dependencies.end(),
+                          [&](Dependency* dep) { dep->set_always_ready(); });
+            std::for_each(_finish_dependencies.begin(), 
_finish_dependencies.end(),
+                          [&](Dependency* dep) { dep->set_always_ready(); });
+            std::for_each(_read_dependencies.begin(), _read_dependencies.end(),
+                          [&](std::vector<Dependency*>& deps) {
+                              std::for_each(deps.begin(), deps.end(),
+                                            [&](Dependency* dep) { 
dep->set_always_ready(); });
+                          });
+            // All `_execution_deps` will never be set blocking from ready. So 
we just set ready here.
+            std::for_each(_execution_dependencies.begin(), 
_execution_dependencies.end(),
+                          [&](Dependency* dep) { dep->set_ready(); });
+            _memory_sufficient_dependency->set_ready();
+        } catch (const doris::Exception& e) {
+            LOG(WARNING) << "Terminate failed: " << e.code() << ", " << 
e.to_string();
+        }
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to