This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new aa766a7b4ac [bugfix](pipeline core) lock fragment context during task
close to avoid concurrent delete (#27484)
aa766a7b4ac is described below
commit aa766a7b4ac5e1d647fdb73a8316b4d6014c26ae
Author: yiguolei <[email protected]>
AuthorDate: Thu Nov 23 21:43:39 2023 +0800
[bugfix](pipeline core) lock fragment context during task close to avoid
concurrent delete (#27484)
---
be/src/pipeline/task_scheduler.cpp | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index cc6e502fb5e..0fffdb72a6c 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -352,6 +352,10 @@ void TaskScheduler::_do_work(size_t index) {
void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState
state,
Status exec_status) {
+ // close_a_pipeline may delete fragment context and will core in some defer
+ // code, because the defer code will access fragment context it self.
+ std::shared_ptr<PipelineFragmentContext> lock_for_context =
+ task->fragment_context()->shared_from_this();
auto status = task->try_close(exec_status);
auto cancel = [&]() {
task->query_context()->cancel(true, status.to_string(),
@@ -365,6 +369,9 @@ void TaskScheduler::_try_close_task(PipelineTask* task,
PipelineTaskState state,
}
if (!task->is_pipelineX() && task->is_pending_finish()) {
task->set_state(PipelineTaskState::PENDING_FINISH);
+ // After the task is added to the block queue, it maybe run by another
thread
+ // and the task maybe released in the other thread. And will core at
+ // task set running.
static_cast<void>(_blocked_task_scheduler->add_blocked_task(task));
task->set_running(false);
return;
@@ -382,10 +389,6 @@ void TaskScheduler::_try_close_task(PipelineTask* task,
PipelineTaskState state,
task->set_close_pipeline_time();
task->finalize();
task->set_running(false);
- // close_a_pipeline may delete fragment context and will core in some defer
- // code, because the defer code will access fragment context it self.
- std::shared_ptr<PipelineFragmentContext> lock_for_context =
- task->fragment_context()->shared_from_this();
task->fragment_context()->close_a_pipeline();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]