This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 7675383c400 [bugfix](deadlock) fix dead lock in cancel fragment
(#33181)
7675383c400 is described below
commit 7675383c400a91b1813293306c45c45024854128
Author: yiguolei <[email protected]>
AuthorDate: Wed Apr 3 13:40:07 2024 +0800
[bugfix](deadlock) fix dead lock in cancel fragment (#33181)
Co-authored-by: yiguolei <[email protected]>
---
be/src/pipeline/exec/exchange_sink_buffer.cpp | 2 +-
be/src/pipeline/pipeline_fragment_context.cpp | 48 +++++++++++-----------
be/src/pipeline/pipeline_fragment_context.h | 1 -
.../pipeline_x/pipeline_x_fragment_context.cpp | 36 ++++++++--------
be/src/pipeline/task_scheduler.cpp | 4 +-
be/src/runtime/fragment_mgr.cpp | 2 +-
be/src/runtime/query_context.cpp | 14 ++++---
be/src/runtime/query_context.h | 2 +-
8 files changed, 56 insertions(+), 53 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 0eba79b25c5..3b02373ecbd 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -454,7 +454,7 @@ void ExchangeSinkBuffer<Parent>::_ended(InstanceLoId id) {
template <typename Parent>
void ExchangeSinkBuffer<Parent>::_failed(InstanceLoId id, const std::string&
err) {
_is_finishing = true;
- _context->cancel(true, err, Status::Cancelled(err));
+ _context->cancel(err, Status::Cancelled(err));
_ended(id);
}
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 31f8423334f..2c38ef9c890 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -158,9 +158,12 @@ bool PipelineFragmentContext::is_timeout(const
VecDateTimeValue& now) const {
return false;
}
+// Must not add lock in this method. Because it will call query ctx cancel. And
+// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's
running
+// Method like exchange sink buffer will call query ctx cancel. If we add lock
here
+// There maybe dead lock.
void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
const std::string& msg) {
- std::lock_guard<std::mutex> l(_cancel_lock);
LOG_INFO("PipelineFragmentContext::cancel")
.tag("query_id", print_id(_query_ctx->query_id()))
.tag("fragment_id", _fragment_id)
@@ -172,30 +175,29 @@ void PipelineFragmentContext::cancel(const
PPlanFragmentCancelReason& reason,
// can not be cancelled if other fragments set the query_ctx cancelled,
this will
// make result receiver on fe be stocked on rpc forever until timeout...
// We need a more detail discussion.
- if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) {
- if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
- _is_report_on_cancel = false;
- } else {
- LOG(WARNING) << "PipelineFragmentContext "
- << PrintInstanceStandardInfo(_query_id,
_fragment_instance_id)
- << " is canceled, cancel message: " << msg;
- }
-
- _runtime_state->set_process_status(_query_ctx->exec_status());
- // Get pipe from new load stream manager and send cancel to it or the
fragment may hang to wait read from pipe
- // For stream load the fragment's query_id == load id, it is set in FE.
- auto stream_load_ctx =
_exec_env->new_load_stream_mgr()->get(_query_id);
- if (stream_load_ctx != nullptr) {
- stream_load_ctx->pipe->cancel(msg);
- }
+ _query_ctx->cancel(msg, Status::Cancelled(msg));
+ if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
+ _is_report_on_cancel = false;
+ } else {
+ LOG(WARNING) << "PipelineFragmentContext "
+ << PrintInstanceStandardInfo(_query_id,
_fragment_instance_id)
+ << " is canceled, cancel message: " << msg;
+ }
- // must close stream_mgr to avoid dead lock in Exchange Node
- // TODO bug llj fix this other instance will not cancel
- _exec_env->vstream_mgr()->cancel(_fragment_instance_id,
Status::Cancelled(msg));
- // Cancel the result queue manager used by spark doris connector
- // TODO pipeline incomp
- // _exec_env->result_queue_mgr()->update_queue_status(id,
Status::Aborted(msg));
+ _runtime_state->set_process_status(_query_ctx->exec_status());
+ // Get pipe from new load stream manager and send cancel to it or the
fragment may hang to wait read from pipe
+ // For stream load the fragment's query_id == load id, it is set in FE.
+ auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
+ if (stream_load_ctx != nullptr) {
+ stream_load_ctx->pipe->cancel(msg);
}
+
+ // must close stream_mgr to avoid dead lock in Exchange Node
+ // TODO bug llj fix this other instance will not cancel
+ _exec_env->vstream_mgr()->cancel(_fragment_instance_id,
Status::Cancelled(msg));
+ // Cancel the result queue manager used by spark doris connector
+ // TODO pipeline incomp
+ // _exec_env->result_queue_mgr()->update_queue_status(id,
Status::Aborted(msg));
}
PipelinePtr PipelineFragmentContext::add_pipeline() {
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 8ad36612f4a..96936233b39 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -213,7 +213,6 @@ protected:
VecDateTimeValue _start_time;
int _timeout = -1;
- std::mutex _cancel_lock;
private:
std::vector<std::unique_ptr<PipelineTask>> _tasks;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index fa53e6f4b11..4419ecbe7f4 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -126,7 +126,6 @@ PipelineXFragmentContext::~PipelineXFragmentContext() {
void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
const std::string& msg) {
- std::lock_guard<std::mutex> l(_cancel_lock);
LOG_INFO("PipelineXFragmentContext::cancel")
.tag("query_id", print_id(_query_id))
.tag("fragment_id", _fragment_id)
@@ -135,25 +134,24 @@ void PipelineXFragmentContext::cancel(const
PPlanFragmentCancelReason& reason,
if (reason == PPlanFragmentCancelReason::TIMEOUT) {
LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout
: " << debug_string();
}
- if (_query_ctx->cancel(true, msg, Status::Cancelled(msg), _fragment_id)) {
- if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
- _is_report_on_cancel = false;
- } else {
- for (auto& id : _fragment_instance_ids) {
- LOG(WARNING) << "PipelineXFragmentContext cancel instance: "
<< print_id(id);
- }
- }
- // Get pipe from new load stream manager and send cancel to it or the
fragment may hang to wait read from pipe
- // For stream load the fragment's query_id == load id, it is set in FE.
- auto stream_load_ctx =
_exec_env->new_load_stream_mgr()->get(_query_id);
- if (stream_load_ctx != nullptr) {
- stream_load_ctx->pipe->cancel(msg);
+ _query_ctx->cancel(msg, Status::Cancelled(msg), _fragment_id);
+ if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
+ _is_report_on_cancel = false;
+ } else {
+ for (auto& id : _fragment_instance_ids) {
+ LOG(WARNING) << "PipelineXFragmentContext cancel instance: " <<
print_id(id);
}
-
- // Cancel the result queue manager used by spark doris connector
- // TODO pipeline incomp
- // _exec_env->result_queue_mgr()->update_queue_status(id,
Status::Aborted(msg));
}
+ // Get pipe from new load stream manager and send cancel to it or the
fragment may hang to wait read from pipe
+ // For stream load the fragment's query_id == load id, it is set in FE.
+ auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
+ if (stream_load_ctx != nullptr) {
+ stream_load_ctx->pipe->cancel(msg);
+ }
+
+ // Cancel the result queue manager used by spark doris connector
+ // TODO pipeline incomp
+ // _exec_env->result_queue_mgr()->update_queue_status(id,
Status::Aborted(msg));
for (auto& tasks : _tasks) {
for (auto& task : tasks) {
task->clear_blocking_state();
@@ -1326,7 +1324,7 @@ void
PipelineXFragmentContext::close_if_prepare_failed(Status st) {
close_a_pipeline();
}
}
- _query_ctx->cancel(true, st.to_string(), st, _fragment_id);
+ _query_ctx->cancel(st.to_string(), st, _fragment_id);
}
void PipelineXFragmentContext::_close_fragment_instance() {
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 3e03f3636fc..8819067e597 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -135,7 +135,7 @@ void BlockedTaskScheduler::_schedule() {
<< ", instance_id=" <<
print_id(task->instance_id())
<< ", task info: " << task->debug_string();
- task->query_context()->cancel(true, "", Status::Cancelled(""));
+ task->query_context()->cancel("", Status::Cancelled(""));
_make_task_run(local_blocked_tasks, iter);
} else if (state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) {
if (task->has_dependency()) {
@@ -241,7 +241,7 @@ void _close_task(PipelineTask* task, PipelineTaskState
state, Status exec_status
task->fragment_context()->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
std::string(status.msg()));
} else {
- task->query_context()->cancel(true, status.to_string(),
+ task->query_context()->cancel(status.to_string(),
Status::Cancelled(status.to_string()));
}
state = PipelineTaskState::CANCELED;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index d852385d265..68c4afa3821 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1002,7 +1002,7 @@ void FragmentMgr::cancel_query(const TUniqueId& query_id,
const PPlanFragmentCan
}
}
- query_ctx->cancel(true, msg, Status::Cancelled(msg));
+ query_ctx->cancel(msg, Status::Cancelled(msg));
{
std::lock_guard<std::mutex> state_lock(_lock);
_query_ctx_map.erase(query_id);
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 4fb5df7c7dd..681d0e333c7 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -157,12 +157,14 @@ void QueryContext::set_execution_dependency_ready() {
_execution_dependency->set_ready();
}
-bool QueryContext::cancel(bool v, std::string msg, Status new_status, int
fragment_id) {
- if (_is_cancelled) {
- return false;
+void QueryContext::cancel(std::string msg, Status new_status, int fragment_id)
{
+ // Just for CAS need a left value
+ bool false_cancel = false;
+ if (!_is_cancelled.compare_exchange_strong(false_cancel, true)) {
+ return;
}
+ DCHECK(!false_cancel && _is_cancelled);
set_exec_status(new_status);
- _is_cancelled.store(v);
set_ready_to_execute(true);
std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>>
ctx_to_cancel;
@@ -175,12 +177,14 @@ bool QueryContext::cancel(bool v, std::string msg, Status
new_status, int fragme
ctx_to_cancel.push_back(f_context);
}
}
+ // Must not add lock here. There maybe dead lock because it will call
fragment
+ // ctx cancel and fragment ctx will call query ctx cancel.
for (auto& f_context : ctx_to_cancel) {
if (auto pipeline_ctx = f_context.lock()) {
pipeline_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
msg);
}
}
- return true;
+ return;
}
void QueryContext::cancel_all_pipeline_context(const
PPlanFragmentCancelReason& reason,
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 1551af46c95..5dd0999a63d 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -114,7 +114,7 @@ public:
const std::string& msg);
void set_pipeline_context(const int fragment_id,
std::shared_ptr<pipeline::PipelineFragmentContext> pip_ctx);
- bool cancel(bool v, std::string msg, Status new_status, int fragment_id =
-1);
+ void cancel(std::string msg, Status new_status, int fragment_id = -1);
void set_exec_status(Status new_status) {
if (new_status.ok()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]