This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 6b54171778a1f3a2f4e63d86ccbd07e935d2368f Author: yiguolei <[email protected]> AuthorDate: Thu Mar 21 22:11:06 2024 +0800 [bugfix](deadlock) pipelinex map lock should only scope in map not about pipelinectx's cancel method (#32622) both global lock in fragment mgr should only protect the map logic, could not use it to protect cancel method. fragment ctx cancel method should be protected by a lock. query ctx cancel --> pipelinex fragment cancel ---> query ctx cancel will dead lock. --- be/src/pipeline/pipeline_fragment_context.cpp | 1 + be/src/pipeline/pipeline_fragment_context.h | 1 + .../pipeline_x/pipeline_x_fragment_context.cpp | 1 + be/src/runtime/fragment_mgr.cpp | 106 ++++++++++++--------- be/src/runtime/fragment_mgr.h | 7 -- be/src/runtime/query_context.cpp | 32 +++++-- .../main/java/org/apache/doris/qe/Coordinator.java | 4 +- 7 files changed, 86 insertions(+), 66 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index aeab107109a..31f8423334f 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -160,6 +160,7 @@ bool PipelineFragmentContext::is_timeout(const VecDateTimeValue& now) const { void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { + std::lock_guard<std::mutex> l(_cancel_lock); LOG_INFO("PipelineFragmentContext::cancel") .tag("query_id", print_id(_query_ctx->query_id())) .tag("fragment_id", _fragment_id) diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 96936233b39..8ad36612f4a 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -213,6 +213,7 @@ protected: VecDateTimeValue _start_time; int _timeout = -1; + std::mutex _cancel_lock; private: std::vector<std::unique_ptr<PipelineTask>> _tasks; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 6aefea3c364..2ee60091a7d 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -126,6 +126,7 @@ PipelineXFragmentContext::~PipelineXFragmentContext() { void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { + std::lock_guard<std::mutex> l(_cancel_lock); LOG_INFO("PipelineXFragmentContext::cancel") .tag("query_id", print_id(_query_id)) .tag("fragment_id", _fragment_id) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index cd4a029000c..d852385d265 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -978,71 +978,83 @@ void FragmentMgr::_set_scan_concurrency(const Param& params, QueryContext* query void FragmentMgr::cancel_query(const TUniqueId& query_id, const PPlanFragmentCancelReason& reason, const std::string& msg) { - std::unique_lock<std::mutex> state_lock(_lock); - return cancel_query_unlocked(query_id, reason, state_lock, msg); -} - -// Cancel all instances/fragments of query, and set query_ctx of the query canceled at last. -void FragmentMgr::cancel_query_unlocked(const TUniqueId& query_id, - const PPlanFragmentCancelReason& reason, - const std::unique_lock<std::mutex>& state_lock, - const std::string& msg) { - auto ctx = _query_ctx_map.find(query_id); + std::shared_ptr<QueryContext> query_ctx; + std::vector<TUniqueId> all_instance_ids; + { + std::lock_guard<std::mutex> state_lock(_lock); + auto ctx_iter = _query_ctx_map.find(query_id); - if (ctx == _query_ctx_map.end()) { - LOG(WARNING) << "Query " << print_id(query_id) << " does not exists, failed to cancel it"; - return; + if (ctx_iter == _query_ctx_map.end()) { + LOG(WARNING) << "Query " << print_id(query_id) + << " does not exists, failed to cancel it"; + return; + } + query_ctx = ctx_iter->second; + // Copy instanceids to avoid concurrent modification. + // And to reduce the scope of lock. + all_instance_ids = query_ctx->fragment_instance_ids; } - if (ctx->second->enable_pipeline_x_exec()) { - ctx->second->cancel_all_pipeline_context(reason, msg); + if (query_ctx->enable_pipeline_x_exec()) { + query_ctx->cancel_all_pipeline_context(reason, msg); } else { - for (auto it : ctx->second->fragment_instance_ids) { - cancel_instance_unlocked(it, reason, state_lock, msg); + for (auto it : all_instance_ids) { + cancel_instance(it, reason, msg); } } - ctx->second->cancel(true, msg, Status::Cancelled(msg)); - _query_ctx_map.erase(query_id); + query_ctx->cancel(true, msg, Status::Cancelled(msg)); + { + std::lock_guard<std::mutex> state_lock(_lock); + _query_ctx_map.erase(query_id); + } LOG(INFO) << "Query " << print_id(query_id) << " is cancelled and removed. Reason: " << msg; } void FragmentMgr::cancel_instance(const TUniqueId& instance_id, const PPlanFragmentCancelReason& reason, const std::string& msg) { - std::unique_lock<std::mutex> state_lock(_lock); - return cancel_instance_unlocked(instance_id, reason, state_lock, msg); -} - -void FragmentMgr::cancel_instance_unlocked(const TUniqueId& instance_id, - const PPlanFragmentCancelReason& reason, - const std::unique_lock<std::mutex>& state_lock, - const std::string& msg) { - const bool is_pipeline_instance = _pipeline_map.contains(instance_id); - - if (is_pipeline_instance) { - auto itr = _pipeline_map.find(instance_id); - - if (itr != _pipeline_map.end()) { - // calling PipelineFragmentContext::cancel - itr->second->cancel(reason, msg); - } else { - LOG(WARNING) << "Could not find the pipeline instance id:" << print_id(instance_id) - << " to cancel"; - } - } else { - auto itr = _fragment_instance_map.find(instance_id); - if (itr != _fragment_instance_map.end()) { - // calling PlanFragmentExecutor::cancel - itr->second->cancel(reason, msg); + std::shared_ptr<pipeline::PipelineFragmentContext> pipeline_ctx; + std::shared_ptr<PlanFragmentExecutor> non_pipeline_ctx; + { + std::lock_guard<std::mutex> state_lock(_lock); + const bool is_pipeline_instance = _pipeline_map.contains(instance_id); + if (is_pipeline_instance) { + auto itr = _pipeline_map.find(instance_id); + if (itr != _pipeline_map.end()) { + pipeline_ctx = itr->second; + } else { + LOG(WARNING) << "Could not find the pipeline instance id:" << print_id(instance_id) + << " to cancel"; + return; + } } else { - LOG(WARNING) << "Could not find the fragment instance id:" << print_id(instance_id) - << " to cancel"; + auto itr = _fragment_instance_map.find(instance_id); + if (itr != _fragment_instance_map.end()) { + non_pipeline_ctx = itr->second; + } else { + LOG(WARNING) << "Could not find the fragment instance id:" << print_id(instance_id) + << " to cancel"; + return; + } } } + + if (pipeline_ctx != nullptr) { + pipeline_ctx->cancel(reason, msg); + } else if (non_pipeline_ctx != nullptr) { + // calling PlanFragmentExecutor::cancel + non_pipeline_ctx->cancel(reason, msg); + } } void FragmentMgr::cancel_fragment(const TUniqueId& query_id, int32_t fragment_id, const PPlanFragmentCancelReason& reason, const std::string& msg) { - if (auto q_ctx = _query_ctx_map.find(query_id)->second) { + std::unique_lock<std::mutex> lock(_lock); + auto q_ctx_iter = _query_ctx_map.find(query_id); + if (q_ctx_iter != _query_ctx_map.end()) { + // Has to use value to keep the shared ptr not deconstructed. + std::shared_ptr<QueryContext> q_ctx = q_ctx_iter->second; + // the lock should only be used to protect the map, not scope query ctx + lock.unlock(); WARN_IF_ERROR(q_ctx->cancel_pipeline_context(fragment_id, reason, msg), "fail to cancel fragment"); } else { diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index ee7d45a6bfd..3435d1f4f64 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -99,10 +99,6 @@ public: // Cancel instance (pipeline or nonpipeline). void cancel_instance(const TUniqueId& instance_id, const PPlanFragmentCancelReason& reason, const std::string& msg = ""); - void cancel_instance_unlocked(const TUniqueId& instance_id, - const PPlanFragmentCancelReason& reason, - const std::unique_lock<std::mutex>& state_lock, - const std::string& msg = ""); // Cancel fragment (only pipelineX). // {query id fragment} -> PipelineXFragmentContext void cancel_fragment(const TUniqueId& query_id, int32_t fragment_id, @@ -111,9 +107,6 @@ public: // Can be used in both version. void cancel_query(const TUniqueId& query_id, const PPlanFragmentCancelReason& reason, const std::string& msg = ""); - void cancel_query_unlocked(const TUniqueId& query_id, const PPlanFragmentCancelReason& reason, - const std::unique_lock<std::mutex>& state_lock, - const std::string& msg = ""); bool query_is_canceled(const TUniqueId& query_id); diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index b2f578a2a75..4fb5df7c7dd 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -165,15 +165,19 @@ bool QueryContext::cancel(bool v, std::string msg, Status new_status, int fragme _is_cancelled.store(v); set_ready_to_execute(true); + std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> ctx_to_cancel; { std::lock_guard<std::mutex> lock(_pipeline_map_write_lock); for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) { if (fragment_id == f_id) { continue; } - if (auto pipeline_ctx = f_context.lock()) { - pipeline_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, msg); - } + ctx_to_cancel.push_back(f_context); + } + } + for (auto& f_context : ctx_to_cancel) { + if (auto pipeline_ctx = f_context.lock()) { + pipeline_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, msg); } } return true; @@ -181,8 +185,14 @@ bool QueryContext::cancel(bool v, std::string msg, Status new_status, int fragme void QueryContext::cancel_all_pipeline_context(const PPlanFragmentCancelReason& reason, const std::string& msg) { - std::lock_guard<std::mutex> lock(_pipeline_map_write_lock); - for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) { + std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> ctx_to_cancel; + { + std::lock_guard<std::mutex> lock(_pipeline_map_write_lock); + for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) { + ctx_to_cancel.push_back(f_context); + } + } + for (auto& f_context : ctx_to_cancel) { if (auto pipeline_ctx = f_context.lock()) { pipeline_ctx->cancel(reason, msg); } @@ -192,11 +202,15 @@ void QueryContext::cancel_all_pipeline_context(const PPlanFragmentCancelReason& Status QueryContext::cancel_pipeline_context(const int fragment_id, const PPlanFragmentCancelReason& reason, const std::string& msg) { - std::lock_guard<std::mutex> lock(_pipeline_map_write_lock); - if (!_fragment_id_to_pipeline_ctx.contains(fragment_id)) { - return Status::InternalError("fragment_id_to_pipeline_ctx is empty!"); + std::weak_ptr<pipeline::PipelineFragmentContext> ctx_to_cancel; + { + std::lock_guard<std::mutex> lock(_pipeline_map_write_lock); + if (!_fragment_id_to_pipeline_ctx.contains(fragment_id)) { + return Status::InternalError("fragment_id_to_pipeline_ctx is empty!"); + } + ctx_to_cancel = _fragment_id_to_pipeline_ctx[fragment_id]; } - if (auto pipeline_ctx = _fragment_id_to_pipeline_ctx[fragment_id].lock()) { + if (auto pipeline_ctx = ctx_to_cancel.lock()) { pipeline_ctx->cancel(reason, msg); } return Status::OK(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 9da938ee746..5531dda0c86 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -3271,9 +3271,6 @@ public class Coordinator implements CoordInterface { // return true if cancel success. Otherwise, return false private synchronized boolean cancelFragment(Types.PPlanFragmentCancelReason cancelReason) { - if (!this.hasCanceled) { - return false; - } for (RuntimeProfile profile : taskProfile) { profile.setIsCancel(true); } @@ -3288,6 +3285,7 @@ public class Coordinator implements CoordInterface { try { BackendServiceProxy.getInstance().cancelPipelineXPlanFragmentAsync(brpcAddress, this.fragmentId, queryId, cancelReason); + this.hasCanceled = true; } catch (RpcException e) { LOG.warn("cancel plan fragment get a exception, address={}:{}", brpcAddress.getHostname(), brpcAddress.getPort()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
