This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 4e6c22ef81c [pipeline](fix) Prevent re-cancel pipeline tasks (#34073)
4e6c22ef81c is described below
commit 4e6c22ef81c6ccb461f87abe298ef710bf19ec5e
Author: Gabriel <[email protected]>
AuthorDate: Wed Apr 24 21:49:11 2024 +0800
[pipeline](fix) Prevent re-cancel pipeline tasks (#34073)
---
be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 10 ++++++++++
1 file changed, 10 insertions(+)
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index c62ed04e580..53d3a1bc5d0 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -130,6 +130,13 @@ PipelineXFragmentContext::~PipelineXFragmentContext() {
void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
const std::string& msg) {
+ {
+ std::lock_guard<std::mutex> l(_task_mutex);
+ if (_closed_tasks == _total_tasks) {
+ // All tasks in this PipelineXFragmentContext already closed.
+ return;
+ }
+ }
LOG_INFO("PipelineXFragmentContext::cancel")
.tag("query_id", print_id(_query_id))
.tag("fragment_id", _fragment_id)
@@ -158,6 +165,9 @@ void PipelineXFragmentContext::cancel(const
PPlanFragmentCancelReason& reason,
// _exec_env->result_queue_mgr()->update_queue_status(id,
Status::Aborted(msg));
for (auto& tasks : _tasks) {
for (auto& task : tasks) {
+ if (task->is_finished()) {
+ continue;
+ }
task->clear_blocking_state();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]