This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 005f7af21f9 [bugfix](deadlock) should not use query cancelled in 
fragment mgr
005f7af21f9 is described below

commit 005f7af21f93fc6a3e20a2cadd03ad24115325b9
Author: yiguolei <[email protected]>
AuthorDate: Tue Apr 9 15:45:51 2024 +0800

    [bugfix](deadlock) should not use query cancelled in fragment mgr
---
 be/src/runtime/fragment_mgr.cpp                | 30 --------------------------
 be/src/runtime/fragment_mgr.h                  |  2 --
 be/src/runtime/memory/thread_mem_tracker_mgr.h |  4 ++++
 be/src/runtime/thread_context.h                |  1 +
 be/src/vec/common/allocator.cpp                |  6 ++----
 5 files changed, 7 insertions(+), 36 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 68c4afa3821..de964b6da46 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1063,36 +1063,6 @@ void FragmentMgr::cancel_fragment(const TUniqueId& 
query_id, int32_t fragment_id
     }
 }
 
-bool FragmentMgr::query_is_canceled(const TUniqueId& query_id) {
-    std::lock_guard<std::mutex> lock(_lock);
-    auto ctx = _query_ctx_map.find(query_id);
-
-    if (ctx != _query_ctx_map.end()) {
-        const bool is_pipeline_version = ctx->second->enable_pipeline_exec();
-        const bool is_pipeline_x = ctx->second->enable_pipeline_x_exec();
-        if (is_pipeline_x) {
-            return ctx->second->is_cancelled();
-        } else {
-            for (auto itr : ctx->second->fragment_instance_ids) {
-                if (is_pipeline_version) {
-                    auto pipeline_ctx_iter = _pipeline_map.find(itr);
-                    if (pipeline_ctx_iter != _pipeline_map.end() && 
pipeline_ctx_iter->second) {
-                        return pipeline_ctx_iter->second->is_canceled();
-                    }
-                } else {
-                    auto fragment_instance_itr = 
_fragment_instance_map.find(itr);
-                    if (fragment_instance_itr != _fragment_instance_map.end() 
&&
-                        fragment_instance_itr->second) {
-                        return fragment_instance_itr->second->is_canceled();
-                    }
-                }
-            }
-        }
-    }
-
-    return true;
-}
-
 void FragmentMgr::cancel_worker() {
     LOG(INFO) << "FragmentMgr cancel worker start working.";
     do {
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 3435d1f4f64..0bae4939b66 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -108,8 +108,6 @@ public:
     void cancel_query(const TUniqueId& query_id, const 
PPlanFragmentCancelReason& reason,
                       const std::string& msg = "");
 
-    bool query_is_canceled(const TUniqueId& query_id);
-
     void cancel_worker();
 
     void debug(std::stringstream& ss) override;
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h 
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index b2fa3df9f8c..5f65d890f57 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -84,6 +84,10 @@ public:
 
     bool is_attach_query() { return _query_id != TUniqueId(); }
 
+    bool is_query_cancelled() const { return _is_query_cancelled; }
+
+    void reset_query_cancelled_flag(bool new_val) { _is_query_cancelled = 
new_val; }
+
     std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() {
         CHECK(init());
         return _limiter_tracker;
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 3c0fc66dda5..91b1d6f91ef 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -146,6 +146,7 @@ public:
 #endif
         _task_id = task_id;
         thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, _task_id);
+        thread_mem_tracker_mgr->reset_query_cancelled_flag(false);
     }
 
     void detach_task() {
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index 9501c1bcfc9..02ae0dc4d71 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -72,8 +72,7 @@ void Allocator<clear_memory_, mmap_populate, 
use_mmap>::sys_memory_check(size_t
 
         // TODO, Save the query context in the thread context, instead of 
finding whether the query id is canceled in fragment_mgr.
         if (doris::is_thread_context_init() &&
-            doris::ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled(
-                    doris::thread_context()->task_id())) {
+            
doris::thread_context()->thread_mem_tracker_mgr->is_query_cancelled()) {
             if (doris::enable_thread_catch_bad_alloc) {
                 throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, 
err_msg);
             }
@@ -94,8 +93,7 @@ void Allocator<clear_memory_, mmap_populate, 
use_mmap>::sys_memory_check(size_t
                     doris::MemInfo::refresh_interval_memory_growth += size;
                     break;
                 }
-                if 
(doris::ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled(
-                            doris::thread_context()->task_id())) {
+                if 
(doris::thread_context()->thread_mem_tracker_mgr->is_query_cancelled()) {
                     if (doris::enable_thread_catch_bad_alloc) {
                         throw 
doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg);
                     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to