This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new aa766a7b4ac [bugfix](pipeline core) lock fragment context during task 
close to avoid concurrent delete (#27484)
aa766a7b4ac is described below

commit aa766a7b4ac5e1d647fdb73a8316b4d6014c26ae
Author: yiguolei <[email protected]>
AuthorDate: Thu Nov 23 21:43:39 2023 +0800

    [bugfix](pipeline core) lock fragment context during task close to avoid 
concurrent delete (#27484)
---
 be/src/pipeline/task_scheduler.cpp | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index cc6e502fb5e..0fffdb72a6c 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -352,6 +352,10 @@ void TaskScheduler::_do_work(size_t index) {
 
 void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState 
state,
                                     Status exec_status) {
+    // close_a_pipeline may delete fragment context and will core in some defer
+    // code, because the defer code will access fragment context it self.
+    std::shared_ptr<PipelineFragmentContext> lock_for_context =
+            task->fragment_context()->shared_from_this();
     auto status = task->try_close(exec_status);
     auto cancel = [&]() {
         task->query_context()->cancel(true, status.to_string(),
@@ -365,6 +369,9 @@ void TaskScheduler::_try_close_task(PipelineTask* task, 
PipelineTaskState state,
     }
     if (!task->is_pipelineX() && task->is_pending_finish()) {
         task->set_state(PipelineTaskState::PENDING_FINISH);
+        // After the task is added to the block queue, it maybe run by another 
thread
+        // and the task maybe released in the other thread. And will core at
+        // task set running.
         static_cast<void>(_blocked_task_scheduler->add_blocked_task(task));
         task->set_running(false);
         return;
@@ -382,10 +389,6 @@ void TaskScheduler::_try_close_task(PipelineTask* task, 
PipelineTaskState state,
     task->set_close_pipeline_time();
     task->finalize();
     task->set_running(false);
-    // close_a_pipeline may delete fragment context and will core in some defer
-    // code, because the defer code will access fragment context it self.
-    std::shared_ptr<PipelineFragmentContext> lock_for_context =
-            task->fragment_context()->shared_from_this();
     task->fragment_context()->close_a_pipeline();
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to