This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 6b54171778a1f3a2f4e63d86ccbd07e935d2368f
Author: yiguolei <[email protected]>
AuthorDate: Thu Mar 21 22:11:06 2024 +0800

    [bugfix](deadlock) pipelinex map lock should only scope in map not about 
pipelinectx's cancel method (#32622)
    
    both global lock in fragment mgr should only protect the map logic, could 
not use it to protect cancel method.
    fragment ctx cancel method should be protected by a lock.
    query ctx cancel --> pipelinex fragment cancel ---> query ctx cancel will 
dead lock.
---
 be/src/pipeline/pipeline_fragment_context.cpp      |   1 +
 be/src/pipeline/pipeline_fragment_context.h        |   1 +
 .../pipeline_x/pipeline_x_fragment_context.cpp     |   1 +
 be/src/runtime/fragment_mgr.cpp                    | 106 ++++++++++++---------
 be/src/runtime/fragment_mgr.h                      |   7 --
 be/src/runtime/query_context.cpp                   |  32 +++++--
 .../main/java/org/apache/doris/qe/Coordinator.java |   4 +-
 7 files changed, 86 insertions(+), 66 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index aeab107109a..31f8423334f 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -160,6 +160,7 @@ bool PipelineFragmentContext::is_timeout(const 
VecDateTimeValue& now) const {
 
 void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
                                      const std::string& msg) {
+    std::lock_guard<std::mutex> l(_cancel_lock);
     LOG_INFO("PipelineFragmentContext::cancel")
             .tag("query_id", print_id(_query_ctx->query_id()))
             .tag("fragment_id", _fragment_id)
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 96936233b39..8ad36612f4a 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -213,6 +213,7 @@ protected:
 
     VecDateTimeValue _start_time;
     int _timeout = -1;
+    std::mutex _cancel_lock;
 
 private:
     std::vector<std::unique_ptr<PipelineTask>> _tasks;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 6aefea3c364..2ee60091a7d 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -126,6 +126,7 @@ PipelineXFragmentContext::~PipelineXFragmentContext() {
 
 void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
                                       const std::string& msg) {
+    std::lock_guard<std::mutex> l(_cancel_lock);
     LOG_INFO("PipelineXFragmentContext::cancel")
             .tag("query_id", print_id(_query_id))
             .tag("fragment_id", _fragment_id)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index cd4a029000c..d852385d265 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -978,71 +978,83 @@ void FragmentMgr::_set_scan_concurrency(const Param& 
params, QueryContext* query
 
 void FragmentMgr::cancel_query(const TUniqueId& query_id, const 
PPlanFragmentCancelReason& reason,
                                const std::string& msg) {
-    std::unique_lock<std::mutex> state_lock(_lock);
-    return cancel_query_unlocked(query_id, reason, state_lock, msg);
-}
-
-// Cancel all instances/fragments of query, and set query_ctx of the query 
canceled at last.
-void FragmentMgr::cancel_query_unlocked(const TUniqueId& query_id,
-                                        const PPlanFragmentCancelReason& 
reason,
-                                        const std::unique_lock<std::mutex>& 
state_lock,
-                                        const std::string& msg) {
-    auto ctx = _query_ctx_map.find(query_id);
+    std::shared_ptr<QueryContext> query_ctx;
+    std::vector<TUniqueId> all_instance_ids;
+    {
+        std::lock_guard<std::mutex> state_lock(_lock);
+        auto ctx_iter = _query_ctx_map.find(query_id);
 
-    if (ctx == _query_ctx_map.end()) {
-        LOG(WARNING) << "Query " << print_id(query_id) << " does not exists, 
failed to cancel it";
-        return;
+        if (ctx_iter == _query_ctx_map.end()) {
+            LOG(WARNING) << "Query " << print_id(query_id)
+                         << " does not exists, failed to cancel it";
+            return;
+        }
+        query_ctx = ctx_iter->second;
+        // Copy instanceids to avoid concurrent modification.
+        // And to reduce the scope of lock.
+        all_instance_ids = query_ctx->fragment_instance_ids;
     }
-    if (ctx->second->enable_pipeline_x_exec()) {
-        ctx->second->cancel_all_pipeline_context(reason, msg);
+    if (query_ctx->enable_pipeline_x_exec()) {
+        query_ctx->cancel_all_pipeline_context(reason, msg);
     } else {
-        for (auto it : ctx->second->fragment_instance_ids) {
-            cancel_instance_unlocked(it, reason, state_lock, msg);
+        for (auto it : all_instance_ids) {
+            cancel_instance(it, reason, msg);
         }
     }
 
-    ctx->second->cancel(true, msg, Status::Cancelled(msg));
-    _query_ctx_map.erase(query_id);
+    query_ctx->cancel(true, msg, Status::Cancelled(msg));
+    {
+        std::lock_guard<std::mutex> state_lock(_lock);
+        _query_ctx_map.erase(query_id);
+    }
     LOG(INFO) << "Query " << print_id(query_id) << " is cancelled and removed. 
Reason: " << msg;
 }
 
 void FragmentMgr::cancel_instance(const TUniqueId& instance_id,
                                   const PPlanFragmentCancelReason& reason, 
const std::string& msg) {
-    std::unique_lock<std::mutex> state_lock(_lock);
-    return cancel_instance_unlocked(instance_id, reason, state_lock, msg);
-}
-
-void FragmentMgr::cancel_instance_unlocked(const TUniqueId& instance_id,
-                                           const PPlanFragmentCancelReason& 
reason,
-                                           const std::unique_lock<std::mutex>& 
state_lock,
-                                           const std::string& msg) {
-    const bool is_pipeline_instance = _pipeline_map.contains(instance_id);
-
-    if (is_pipeline_instance) {
-        auto itr = _pipeline_map.find(instance_id);
-
-        if (itr != _pipeline_map.end()) {
-            // calling PipelineFragmentContext::cancel
-            itr->second->cancel(reason, msg);
-        } else {
-            LOG(WARNING) << "Could not find the pipeline instance id:" << 
print_id(instance_id)
-                         << " to cancel";
-        }
-    } else {
-        auto itr = _fragment_instance_map.find(instance_id);
-        if (itr != _fragment_instance_map.end()) {
-            // calling PlanFragmentExecutor::cancel
-            itr->second->cancel(reason, msg);
+    std::shared_ptr<pipeline::PipelineFragmentContext> pipeline_ctx;
+    std::shared_ptr<PlanFragmentExecutor> non_pipeline_ctx;
+    {
+        std::lock_guard<std::mutex> state_lock(_lock);
+        const bool is_pipeline_instance = _pipeline_map.contains(instance_id);
+        if (is_pipeline_instance) {
+            auto itr = _pipeline_map.find(instance_id);
+            if (itr != _pipeline_map.end()) {
+                pipeline_ctx = itr->second;
+            } else {
+                LOG(WARNING) << "Could not find the pipeline instance id:" << 
print_id(instance_id)
+                             << " to cancel";
+                return;
+            }
         } else {
-            LOG(WARNING) << "Could not find the fragment instance id:" << 
print_id(instance_id)
-                         << " to cancel";
+            auto itr = _fragment_instance_map.find(instance_id);
+            if (itr != _fragment_instance_map.end()) {
+                non_pipeline_ctx = itr->second;
+            } else {
+                LOG(WARNING) << "Could not find the fragment instance id:" << 
print_id(instance_id)
+                             << " to cancel";
+                return;
+            }
         }
     }
+
+    if (pipeline_ctx != nullptr) {
+        pipeline_ctx->cancel(reason, msg);
+    } else if (non_pipeline_ctx != nullptr) {
+        // calling PlanFragmentExecutor::cancel
+        non_pipeline_ctx->cancel(reason, msg);
+    }
 }
 
 void FragmentMgr::cancel_fragment(const TUniqueId& query_id, int32_t 
fragment_id,
                                   const PPlanFragmentCancelReason& reason, 
const std::string& msg) {
-    if (auto q_ctx = _query_ctx_map.find(query_id)->second) {
+    std::unique_lock<std::mutex> lock(_lock);
+    auto q_ctx_iter = _query_ctx_map.find(query_id);
+    if (q_ctx_iter != _query_ctx_map.end()) {
+        // Has to use value to keep the shared ptr not deconstructed.
+        std::shared_ptr<QueryContext> q_ctx = q_ctx_iter->second;
+        // the lock should only be used to protect the map, not scope query ctx
+        lock.unlock();
         WARN_IF_ERROR(q_ctx->cancel_pipeline_context(fragment_id, reason, msg),
                       "fail to cancel fragment");
     } else {
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index ee7d45a6bfd..3435d1f4f64 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -99,10 +99,6 @@ public:
     // Cancel instance (pipeline or nonpipeline).
     void cancel_instance(const TUniqueId& instance_id, const 
PPlanFragmentCancelReason& reason,
                          const std::string& msg = "");
-    void cancel_instance_unlocked(const TUniqueId& instance_id,
-                                  const PPlanFragmentCancelReason& reason,
-                                  const std::unique_lock<std::mutex>& 
state_lock,
-                                  const std::string& msg = "");
     // Cancel fragment (only pipelineX).
     // {query id fragment} -> PipelineXFragmentContext
     void cancel_fragment(const TUniqueId& query_id, int32_t fragment_id,
@@ -111,9 +107,6 @@ public:
     // Can be used in both version.
     void cancel_query(const TUniqueId& query_id, const 
PPlanFragmentCancelReason& reason,
                       const std::string& msg = "");
-    void cancel_query_unlocked(const TUniqueId& query_id, const 
PPlanFragmentCancelReason& reason,
-                               const std::unique_lock<std::mutex>& state_lock,
-                               const std::string& msg = "");
 
     bool query_is_canceled(const TUniqueId& query_id);
 
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index b2f578a2a75..4fb5df7c7dd 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -165,15 +165,19 @@ bool QueryContext::cancel(bool v, std::string msg, Status 
new_status, int fragme
     _is_cancelled.store(v);
 
     set_ready_to_execute(true);
+    std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> 
ctx_to_cancel;
     {
         std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
         for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
             if (fragment_id == f_id) {
                 continue;
             }
-            if (auto pipeline_ctx = f_context.lock()) {
-                
pipeline_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, msg);
-            }
+            ctx_to_cancel.push_back(f_context);
+        }
+    }
+    for (auto& f_context : ctx_to_cancel) {
+        if (auto pipeline_ctx = f_context.lock()) {
+            pipeline_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, 
msg);
         }
     }
     return true;
@@ -181,8 +185,14 @@ bool QueryContext::cancel(bool v, std::string msg, Status 
new_status, int fragme
 
 void QueryContext::cancel_all_pipeline_context(const 
PPlanFragmentCancelReason& reason,
                                                const std::string& msg) {
-    std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
-    for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
+    std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> 
ctx_to_cancel;
+    {
+        std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
+        for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
+            ctx_to_cancel.push_back(f_context);
+        }
+    }
+    for (auto& f_context : ctx_to_cancel) {
         if (auto pipeline_ctx = f_context.lock()) {
             pipeline_ctx->cancel(reason, msg);
         }
@@ -192,11 +202,15 @@ void QueryContext::cancel_all_pipeline_context(const 
PPlanFragmentCancelReason&
 Status QueryContext::cancel_pipeline_context(const int fragment_id,
                                              const PPlanFragmentCancelReason& 
reason,
                                              const std::string& msg) {
-    std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
-    if (!_fragment_id_to_pipeline_ctx.contains(fragment_id)) {
-        return Status::InternalError("fragment_id_to_pipeline_ctx is empty!");
+    std::weak_ptr<pipeline::PipelineFragmentContext> ctx_to_cancel;
+    {
+        std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
+        if (!_fragment_id_to_pipeline_ctx.contains(fragment_id)) {
+            return Status::InternalError("fragment_id_to_pipeline_ctx is 
empty!");
+        }
+        ctx_to_cancel = _fragment_id_to_pipeline_ctx[fragment_id];
     }
-    if (auto pipeline_ctx = _fragment_id_to_pipeline_ctx[fragment_id].lock()) {
+    if (auto pipeline_ctx = ctx_to_cancel.lock()) {
         pipeline_ctx->cancel(reason, msg);
     }
     return Status::OK();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 9da938ee746..5531dda0c86 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3271,9 +3271,6 @@ public class Coordinator implements CoordInterface {
         // return true if cancel success. Otherwise, return false
 
         private synchronized boolean 
cancelFragment(Types.PPlanFragmentCancelReason cancelReason) {
-            if (!this.hasCanceled) {
-                return false;
-            }
             for (RuntimeProfile profile : taskProfile) {
                 profile.setIsCancel(true);
             }
@@ -3288,6 +3285,7 @@ public class Coordinator implements CoordInterface {
                 try {
                     
BackendServiceProxy.getInstance().cancelPipelineXPlanFragmentAsync(brpcAddress,
                             this.fragmentId, queryId, cancelReason);
+                    this.hasCanceled = true;
                 } catch (RpcException e) {
                     LOG.warn("cancel plan fragment get a exception, 
address={}:{}", brpcAddress.getHostname(),
                             brpcAddress.getPort());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to