This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new cb5c1673739 branch-3.1: [refactor](pipeline) Re-construct ownership of 
pipeline components #49753 (#54567)
cb5c1673739 is described below

commit cb5c167373946e409a9ede31fc5d7db01abe70ca
Author: Gabriel <[email protected]>
AuthorDate: Wed Aug 13 14:27:44 2025 +0800

    branch-3.1: [refactor](pipeline) Re-construct ownership of pipeline 
components #49753 (#54567)
    
    picked from #49753
---
 be/src/pipeline/dependency.cpp                |  18 +++--
 be/src/pipeline/dependency.h                  |   8 +-
 be/src/pipeline/pipeline.h                    |   1 +
 be/src/pipeline/pipeline_fragment_context.cpp |  15 ++--
 be/src/pipeline/pipeline_fragment_context.h   |   2 +-
 be/src/pipeline/pipeline_task.cpp             | 101 ++++++++++++++++++--------
 be/src/pipeline/pipeline_task.h               |  32 ++++----
 be/src/pipeline/task_queue.cpp                |  20 ++---
 be/src/pipeline/task_queue.h                  |  28 +++----
 be/src/pipeline/task_scheduler.cpp            |  38 +++++-----
 be/src/pipeline/task_scheduler.h              |   2 +-
 11 files changed, 158 insertions(+), 107 deletions(-)

diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index b15005e03a8..945ee3d4521 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -46,8 +46,9 @@ Dependency* BasicSharedState::create_sink_dependency(int 
dest_id, int node_id, s
     return sink_deps.back().get();
 }
 
-void Dependency::_add_block_task(PipelineTask* task) {
-    DCHECK(_blocked_task.empty() || _blocked_task[_blocked_task.size() - 1] != 
task)
+void Dependency::_add_block_task(std::shared_ptr<PipelineTask> task) {
+    DCHECK(_blocked_task.empty() || _blocked_task[_blocked_task.size() - 
1].lock() == nullptr ||
+           _blocked_task[_blocked_task.size() - 1].lock().get() != task.get())
             << "Duplicate task: " << task->debug_string();
     _blocked_task.push_back(task);
 }
@@ -57,7 +58,7 @@ void Dependency::set_ready() {
         return;
     }
     _watcher.stop();
-    std::vector<PipelineTask*> local_block_task {};
+    std::vector<std::weak_ptr<PipelineTask>> local_block_task {};
     {
         std::unique_lock<std::mutex> lc(_task_lock);
         if (_ready) {
@@ -66,12 +67,15 @@ void Dependency::set_ready() {
         _ready = true;
         local_block_task.swap(_blocked_task);
     }
-    for (auto* task : local_block_task) {
-        task->wake_up();
+    for (auto task : local_block_task) {
+        if (auto t = task.lock()) {
+            std::unique_lock<std::mutex> lc(_task_lock);
+            t->wake_up();
+        }
     }
 }
 
-Dependency* Dependency::is_blocked_by(PipelineTask* task) {
+Dependency* Dependency::is_blocked_by(std::shared_ptr<PipelineTask> task) {
     std::unique_lock<std::mutex> lc(_task_lock);
     auto ready = _ready.load();
     if (!ready && task) {
@@ -105,7 +109,7 @@ std::string RuntimeFilterDependency::debug_string(int 
indentation_level) {
     return fmt::to_string(debug_string_buffer);
 }
 
-Dependency* RuntimeFilterDependency::is_blocked_by(PipelineTask* task) {
+Dependency* 
RuntimeFilterDependency::is_blocked_by(std::shared_ptr<PipelineTask> task) {
     std::unique_lock<std::mutex> lc(_task_lock);
     auto ready = _ready.load();
     if (!ready && task) {
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index ea6cacf51b1..9f07076497f 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -106,7 +106,7 @@ public:
     [[nodiscard]] int64_t watcher_elapse_time() { return 
_watcher.elapsed_time(); }
 
     // Which dependency current pipeline task is blocked by. `nullptr` if this 
dependency is ready.
-    [[nodiscard]] virtual Dependency* is_blocked_by(PipelineTask* task = 
nullptr);
+    [[nodiscard]] virtual Dependency* 
is_blocked_by(std::shared_ptr<PipelineTask> task = nullptr);
     // Notify downstream pipeline tasks this dependency is ready.
     void set_ready();
     void set_ready_to_read() {
@@ -151,7 +151,7 @@ public:
     }
 
 protected:
-    void _add_block_task(PipelineTask* task);
+    void _add_block_task(std::shared_ptr<PipelineTask> task);
 
     const int _id;
     const int _node_id;
@@ -162,7 +162,7 @@ protected:
     MonotonicStopWatch _watcher;
 
     std::mutex _task_lock;
-    std::vector<PipelineTask*> _blocked_task;
+    std::vector<std::weak_ptr<PipelineTask>> _blocked_task;
 
     // If `_always_ready` is true, `block()` will never block tasks.
     std::atomic<bool> _always_ready = false;
@@ -282,7 +282,7 @@ public:
             : Dependency(id, node_id, name), _runtime_filter(runtime_filter) {}
     std::string debug_string(int indentation_level = 0) override;
 
-    Dependency* is_blocked_by(PipelineTask* task) override;
+    Dependency* is_blocked_by(std::shared_ptr<PipelineTask> task) override;
 
 private:
     const IRuntimeFilter* _runtime_filter = nullptr;
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 061a62ea99b..abfe883a804 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -135,6 +135,7 @@ public:
     }
 
     int num_tasks_of_parent() const { return _num_tasks_of_parent; }
+    std::string& name() { return _name; }
 
 private:
     void _init_profile();
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 6736ff89827..4a069483dc3 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -133,18 +133,17 @@ PipelineFragmentContext::~PipelineFragmentContext() {
     // The memory released by the query end is recorded in the query mem 
tracker.
     SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker);
     auto st = _query_ctx->exec_status();
-    _query_ctx.reset();
     for (size_t i = 0; i < _tasks.size(); i++) {
         if (!_tasks[i].empty()) {
             _call_back(_tasks[i].front()->runtime_state(), &st);
         }
     }
-    _tasks.clear();
     for (auto& runtime_states : _task_runtime_states) {
         for (auto& runtime_state : runtime_states) {
             runtime_state.reset();
         }
     }
+    _tasks.clear();
     _dag.clear();
     _pip_id_to_pipeline.clear();
     _pipelines.clear();
@@ -154,6 +153,7 @@ PipelineFragmentContext::~PipelineFragmentContext() {
     _runtime_filter_states.clear();
     _runtime_filter_mgr_map.clear();
     _op_id_to_le_state.clear();
+    _query_ctx.reset();
 }
 
 bool PipelineFragmentContext::is_timeout(timespec now) const {
@@ -449,10 +449,11 @@ Status 
PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
                 auto cur_task_id = _total_tasks++;
                 task_runtime_state->set_task_id(cur_task_id);
                 task_runtime_state->set_task_num(pipeline->num_tasks());
-                auto task = std::make_unique<PipelineTask>(pipeline, 
cur_task_id,
-                                                           
task_runtime_state.get(), this,
-                                                           
pipeline_id_to_profile[pip_idx].get(),
-                                                           
get_local_exchange_state(pipeline), i);
+                auto task = std::make_shared<PipelineTask>(
+                        pipeline, cur_task_id, task_runtime_state.get(),
+                        
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
+                        pipeline_id_to_profile[pip_idx].get(), 
get_local_exchange_state(pipeline),
+                        i);
                 pipeline->incr_created_tasks(i, task.get());
                 task_runtime_state->set_task(task.get());
                 pipeline_id_to_task.insert({pipeline->id(), task.get()});
@@ -1675,7 +1676,7 @@ Status PipelineFragmentContext::submit() {
     auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
     for (auto& task : _tasks) {
         for (auto& t : task) {
-            st = scheduler->schedule_task(t.get());
+            st = scheduler->schedule_task(t);
             if (!st) {
                 cancel(Status::InternalError("submit context to executor 
fail"));
                 std::lock_guard<std::mutex> l(_task_mutex);
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index f119384b2fb..b08a3f90f95 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -228,7 +228,7 @@ private:
 
     OperatorPtr _root_op = nullptr;
     // this is a [n * m] matrix. n is parallelism of pipeline engine and m is 
the number of pipelines.
-    std::vector<std::vector<std::unique_ptr<PipelineTask>>> _tasks;
+    std::vector<std::vector<std::shared_ptr<PipelineTask>>> _tasks;
 
     // TODO: remove the _sink and _multi_cast_stream_sink_senders to set both
     // of it in pipeline task not the fragment_context
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 99c6bf167b9..dfda613af52 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -48,12 +48,19 @@ namespace doris::pipeline {
 
 PipelineTask::PipelineTask(
         PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
-        PipelineFragmentContext* fragment_context, RuntimeProfile* 
parent_profile,
+        std::shared_ptr<PipelineFragmentContext> fragment_context, 
RuntimeProfile* parent_profile,
         std::map<int,
                  std::pair<std::shared_ptr<LocalExchangeSharedState>, 
std::shared_ptr<Dependency>>>
                 le_state_map,
         int task_idx)
-        : _index(task_id),
+        :
+#ifdef BE_TEST
+          _query_id(fragment_context ? fragment_context->get_query_id() : 
TUniqueId()),
+#else
+          _query_id(fragment_context->get_query_id()),
+#endif
+          _pip_id(pipeline->id()),
+          _index(task_id),
           _pipeline(pipeline),
           _opened(false),
           _state(state),
@@ -64,7 +71,8 @@ PipelineTask::PipelineTask(
           _root(_operators.back().get()),
           _sink(pipeline->sink_shared_pointer()),
           _le_state_map(std::move(le_state_map)),
-          _task_idx(task_idx) {
+          _task_idx(task_idx),
+          _pipeline_name(_pipeline->name()) {
     _pipeline_task_watcher.start();
     
_execution_dependencies.push_back(state->get_query_ctx()->get_execution_dependency());
     auto shared_state = _sink->create_shared_state();
@@ -117,8 +125,12 @@ Status PipelineTask::prepare(const 
TPipelineInstanceParams& local_params, const
         std::copy(deps.begin(), deps.end(),
                   std::inserter(_execution_dependencies, 
_execution_dependencies.end()));
     }
-    if (query_context()->is_cancelled()) {
-        clear_blocking_state();
+    if (auto fragment = _fragment_context.lock()) {
+        if (fragment->get_query_ctx()->is_cancelled()) {
+            clear_blocking_state();
+        }
+    } else {
+        return Status::InternalError("Fragment already finished! Query: {}", 
print_id(_query_id));
     }
     return Status::OK();
 }
@@ -231,7 +243,7 @@ bool PipelineTask::_wait_to_start() {
     // 2. Runtime filter dependencies are ready
     // 3. All tablets are loaded into local storage
     for (auto* op_dep : _execution_dependencies) {
-        _blocked_dep = op_dep->is_blocked_by(this);
+        _blocked_dep = op_dep->is_blocked_by(shared_from_this());
         if (_blocked_dep != nullptr) {
             _blocked_dep->start_watcher();
             return true;
@@ -252,7 +264,7 @@ bool PipelineTask::_is_blocked() {
         for (int i = _read_dependencies.size() - 1; i >= 0; i--) {
             // `_read_dependencies` is organized according to operators. For 
each operator, running condition is met iff all dependencies are ready.
             for (auto* dep : _read_dependencies[i]) {
-                _blocked_dep = dep->is_blocked_by(this);
+                _blocked_dep = dep->is_blocked_by(shared_from_this());
                 if (_blocked_dep != nullptr) {
                     _blocked_dep->start_watcher();
                     return true;
@@ -271,7 +283,7 @@ bool PipelineTask::_is_blocked() {
     }
 
     for (auto* op_dep : _write_dependencies) {
-        _blocked_dep = op_dep->is_blocked_by(this);
+        _blocked_dep = op_dep->is_blocked_by(shared_from_this());
         if (_blocked_dep != nullptr) {
             _blocked_dep->start_watcher();
             return true;
@@ -281,6 +293,10 @@ bool PipelineTask::_is_blocked() {
 }
 
 Status PipelineTask::execute(bool* eos) {
+    auto fragment_context = _fragment_context.lock();
+    if (!fragment_context) {
+        return Status::InternalError("Fragment already finished! Query: {}", 
print_id(_query_id));
+    }
     if (_eos) {
         *eos = true;
         return Status::OK();
@@ -303,11 +319,11 @@ Status PipelineTask::execute(bool* eos) {
         }
         int64_t delta_cpu_time = cpu_time_stop_watch.elapsed_time();
         _task_cpu_timer->update(delta_cpu_time);
-        auto cpu_qs = query_context()->get_cpu_statistics();
+        auto cpu_qs = fragment_context->get_query_ctx()->get_cpu_statistics();
         if (cpu_qs) {
             cpu_qs->add_cpu_nanos(delta_cpu_time);
         }
-        query_context()->update_cpu_time(delta_cpu_time);
+        fragment_context->get_query_ctx()->update_cpu_time(delta_cpu_time);
     }};
     if (!_wake_up_early) {
         RETURN_IF_ERROR(_prepare());
@@ -318,7 +334,7 @@ Status PipelineTask::execute(bool* eos) {
     RETURN_IF_ERROR(_prepare());
 
     // The status must be runnable
-    if (!_opened && !_fragment_context->is_canceled()) {
+    if (!_opened && !fragment_context->is_canceled()) {
         DBUG_EXECUTE_IF("PipelineTask::execute.open_sleep", {
             auto required_pipeline_id =
                     
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
@@ -350,7 +366,7 @@ Status PipelineTask::execute(bool* eos) {
 
     _task_profile->add_info_string("TaskState", "Runnable");
     _task_profile->add_info_string("BlockedByDependency", "");
-    while (!_fragment_context->is_canceled()) {
+    while (!fragment_context->is_canceled()) {
         SCOPED_RAW_TIMER(&time_spent);
         if (_is_blocked()) {
             return Status::OK();
@@ -359,7 +375,7 @@ Status PipelineTask::execute(bool* eos) {
         /// When a task is cancelled,
         /// its blocking state will be cleared and it will transition to a 
ready state (though it is not truly ready).
         /// Here, checking whether it is cancelled to prevent tasks in a 
blocking state from being re-executed.
-        if (_fragment_context->is_canceled()) {
+        if (fragment_context->is_canceled()) {
             break;
         }
 
@@ -428,7 +444,7 @@ Status PipelineTask::execute(bool* eos) {
         }
     }
 
-    RETURN_IF_ERROR(get_task_queue()->push_back(this));
+    RETURN_IF_ERROR(get_task_queue()->push_back(shared_from_this()));
     return Status::OK();
 }
 
@@ -489,12 +505,34 @@ bool PipelineTask::should_revoke_memory(RuntimeState* 
state, int64_t revocable_m
     }
 }
 
+void PipelineTask::stop_if_finished() {
+    auto fragment = _fragment_context.lock();
+    if (!fragment) {
+        return;
+    }
+    
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(fragment->get_query_ctx()->query_mem_tracker);
+    if (auto sink = _sink) {
+        if (sink->is_finished(_state)) {
+            clear_blocking_state();
+        }
+    }
+}
+
 void PipelineTask::finalize() {
+    auto fragment = _fragment_context.lock();
+    if (!fragment) {
+        return;
+    }
+    
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(fragment->get_query_ctx()->query_mem_tracker);
     std::unique_lock<std::mutex> lc(_dependency_lock);
     _finalized = true;
     _sink_shared_state.reset();
     _op_shared_states.clear();
     _le_state_map.clear();
+    _block.reset();
+    _operators.clear();
+    _sink.reset();
+    _pipeline.reset();
 }
 
 Status PipelineTask::close(Status exec_status, bool close_sink) {
@@ -529,31 +567,37 @@ Status PipelineTask::close(Status exec_status, bool 
close_sink) {
 }
 
 std::string PipelineTask::debug_string() {
-    std::unique_lock<std::mutex> lc(_dependency_lock);
     fmt::memory_buffer debug_string_buffer;
 
-    fmt::format_to(debug_string_buffer, "QueryId: {}\n", 
print_id(query_context()->query_id()));
+    fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(_query_id));
     fmt::format_to(debug_string_buffer, "InstanceId: {}\n",
                    print_id(_state->fragment_instance_id()));
 
+    fmt::format_to(
+            debug_string_buffer,
+            "PipelineTask[this = {}, id = {}, open = {}, eos = {}, finalized = 
{}, dry run = "
+            "{}, _wake_up_early = {}, is running = {}]",
+            (void*)this, _index, _opened, _eos, _finalized, _dry_run, 
_wake_up_early.load(),
+            is_running());
+    std::unique_lock<std::mutex> lc(_dependency_lock);
     auto* cur_blocked_dep = _blocked_dep;
-    auto elapsed = _fragment_context->elapsed_time() / 1000000000.0;
+    auto fragment = _fragment_context.lock();
+    if (is_finalized() || !fragment) {
+        fmt::format_to(debug_string_buffer, " pipeline name = {}", 
_pipeline_name);
+        return fmt::to_string(debug_string_buffer);
+    }
+    auto elapsed = fragment->elapsed_time() / NANOS_PER_SEC;
     fmt::format_to(debug_string_buffer,
-                   "PipelineTask[this = {}, id = {}, open = {}, eos = {}, 
finish = {}, dry run = "
-                   "{}, elapse time = {}s, _wake_up_early = {}], block 
dependency = {}, is "
-                   "running = {}\noperators: ",
-                   (void*)this, _index, _opened, _eos, _finalized, _dry_run, 
elapsed,
-                   _wake_up_early.load(),
-                   cur_blocked_dep && !_finalized ? 
cur_blocked_dep->debug_string() : "NULL",
-                   is_running());
+                   " elapse time = {}s, block dependency = [{}]\noperators: ", 
elapsed,
+                   cur_blocked_dep && !is_finalized() ? 
cur_blocked_dep->debug_string() : "NULL");
     for (size_t i = 0; i < _operators.size(); i++) {
         fmt::format_to(debug_string_buffer, "\n{}",
                        _opened && !_finalized ? 
_operators[i]->debug_string(_state, i)
                                               : 
_operators[i]->debug_string(i));
     }
     fmt::format_to(debug_string_buffer, "\n{}\n",
-                   _opened && !_finalized ? _sink->debug_string(_state, 
_operators.size())
-                                          : 
_sink->debug_string(_operators.size()));
+                   _opened && !is_finalized() ? _sink->debug_string(_state, 
_operators.size())
+                                              : 
_sink->debug_string(_operators.size()));
     if (_finalized) {
         return fmt::to_string(debug_string_buffer);
     }
@@ -588,10 +632,7 @@ std::string PipelineTask::debug_string() {
 
 void PipelineTask::wake_up() {
     // call by dependency
-    static_cast<void>(get_task_queue()->push_back(this));
+    static_cast<void>(get_task_queue()->push_back(shared_from_this()));
 }
 
-QueryContext* PipelineTask::query_context() {
-    return _fragment_context->get_query_ctx();
-}
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 64f5f6b1674..3a50436280d 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -45,10 +45,11 @@ class TaskQueue;
 class PriorityTaskQueue;
 class Dependency;
 
-class PipelineTask {
+class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
 public:
     PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
-                 PipelineFragmentContext* fragment_context, RuntimeProfile* 
parent_profile,
+                 std::shared_ptr<PipelineFragmentContext> fragment_context,
+                 RuntimeProfile* parent_profile,
                  std::map<int, 
std::pair<std::shared_ptr<LocalExchangeSharedState>,
                                          std::shared_ptr<Dependency>>>
                          le_state_map,
@@ -63,9 +64,7 @@ public:
     // must be call after all pipeline task is finish to release resource
     Status close(Status exec_status, bool close_sink = true);
 
-    PipelineFragmentContext* fragment_context() { return _fragment_context; }
-
-    QueryContext* query_context();
+    std::weak_ptr<PipelineFragmentContext>& fragment_context() { return 
_fragment_context; }
 
     int get_previous_core_id() const {
         return _previous_schedule_id != -1 ? _previous_schedule_id
@@ -87,7 +86,7 @@ public:
 
     bool is_pending_finish() {
         for (auto* fin_dep : _finish_dependencies) {
-            _blocked_dep = fin_dep->is_blocked_by(this);
+            _blocked_dep = fin_dep->is_blocked_by(shared_from_this());
             if (_blocked_dep != nullptr) {
                 _blocked_dep->start_watcher();
                 return true;
@@ -138,10 +137,11 @@ public:
     void set_wake_up_early() { _wake_up_early = true; }
 
     void clear_blocking_state() {
+        auto fragment = _fragment_context.lock();
         
_state->get_query_ctx()->get_execution_dependency()->set_always_ready();
         // We use a lock to assure all dependencies are not deconstructed here.
         std::unique_lock<std::mutex> lc(_dependency_lock);
-        if (!_finalized) {
+        if (!_finalized && fragment) {
             std::for_each(_execution_dependencies.begin(), 
_execution_dependencies.end(),
                           [&](Dependency* dep) { dep->set_ready(); });
             for (auto& deps : _read_dependencies) {
@@ -227,13 +227,9 @@ public:
 
     std::string task_name() const { return fmt::format("task{}({})", _index, 
_pipeline->_name); }
 
-    void stop_if_finished() {
-        if (_sink->is_finished(_state)) {
-            clear_blocking_state();
-        }
-    }
+    void stop_if_finished();
 
-    PipelineId pipeline_id() const { return _pipeline->id(); }
+    PipelineId pipeline_id() const { return _pip_id; }
 
     bool wake_up_early() const { return _wake_up_early; }
 
@@ -248,7 +244,9 @@ private:
     Status _open();
     Status _prepare();
 
-    uint32_t _index;
+    const TUniqueId _query_id;
+    const PipelineId _pip_id;
+    const uint32_t _index;
     PipelinePtr _pipeline;
     bool _has_exceed_timeout = false;
     bool _opened;
@@ -256,7 +254,7 @@ private:
     int _previous_schedule_id = -1;
     uint32_t _schedule_time = 0;
     std::unique_ptr<doris::vectorized::Block> _block;
-    PipelineFragmentContext* _fragment_context = nullptr;
+    std::weak_ptr<PipelineFragmentContext> _fragment_context;
     TaskQueue* _task_queue = nullptr;
 
     // used for priority queue
@@ -317,6 +315,10 @@ private:
     std::atomic<bool> _running = false;
     std::atomic<bool> _eos = false;
     std::atomic<bool> _wake_up_early = false;
+    const std::string _pipeline_name;
 };
 
+using PipelineTaskSPtr = std::shared_ptr<PipelineTask>;
+using PipelineTaskWPtr = std::weak_ptr<PipelineTask>;
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index b91b74dfdef..43984d03a9d 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -30,7 +30,7 @@ namespace doris::pipeline {
 
 TaskQueue::~TaskQueue() = default;
 
-PipelineTask* SubTaskQueue::try_take(bool is_steal) {
+PipelineTaskSPtr SubTaskQueue::try_take(bool is_steal) {
     if (_queue.empty()) {
         return nullptr;
     }
@@ -56,7 +56,7 @@ void PriorityTaskQueue::close() {
     
DorisMetrics::instance()->pipeline_task_queue_size->increment(-_total_task_size);
 }
 
-PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
+PipelineTaskSPtr PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
     if (_total_task_size == 0 || _closed) {
         return nullptr;
     }
@@ -93,13 +93,13 @@ int PriorityTaskQueue::_compute_level(uint64_t runtime) {
     return SUB_QUEUE_LEVEL - 1;
 }
 
-PipelineTask* PriorityTaskQueue::try_take(bool is_steal) {
+PipelineTaskSPtr PriorityTaskQueue::try_take(bool is_steal) {
     // TODO other efficient lock? e.g. if get lock fail, return null_ptr
     std::unique_lock<std::mutex> lock(_work_size_mutex);
     return _try_take_unprotected(is_steal);
 }
 
-PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) {
+PipelineTaskSPtr PriorityTaskQueue::take(uint32_t timeout_ms) {
     std::unique_lock<std::mutex> lock(_work_size_mutex);
     auto task = _try_take_unprotected(false);
     if (task) {
@@ -114,7 +114,7 @@ PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) {
     }
 }
 
-Status PriorityTaskQueue::push(PipelineTask* task) {
+Status PriorityTaskQueue::push(PipelineTaskSPtr task) {
     if (_closed) {
         return Status::InternalError("WorkTaskQueue closed");
     }
@@ -154,8 +154,8 @@ void MultiCoreTaskQueue::close() {
     }
 }
 
-PipelineTask* MultiCoreTaskQueue::take(int core_id) {
-    PipelineTask* task = nullptr;
+PipelineTaskSPtr MultiCoreTaskQueue::take(int core_id) {
+    PipelineTaskSPtr task = nullptr;
     while (!_closed) {
         DCHECK(_prio_task_queue_list->size() > core_id)
                 << " list size: " << _prio_task_queue_list->size() << " 
core_id: " << core_id
@@ -181,7 +181,7 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) {
     return task;
 }
 
-PipelineTask* MultiCoreTaskQueue::_steal_take(
+PipelineTaskSPtr MultiCoreTaskQueue::_steal_take(
         int core_id, std::vector<std::unique_ptr<PriorityTaskQueue>>& 
prio_task_queue_list) {
     DCHECK(core_id < _core_size);
     int next_id = core_id;
@@ -200,7 +200,7 @@ PipelineTask* MultiCoreTaskQueue::_steal_take(
     return nullptr;
 }
 
-Status MultiCoreTaskQueue::push_back(PipelineTask* task) {
+Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task) {
     int core_id = task->get_previous_core_id();
     if (core_id < 0) {
         core_id = _next_core.fetch_add(1) % _core_size;
@@ -208,7 +208,7 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task) {
     return push_back(task, core_id);
 }
 
-Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) {
+Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task, int core_id) {
     DCHECK(core_id < _core_size);
     task->put_in_runnable_queue();
     return (*_prio_task_queue_list)[core_id]->push(task);
diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h
index b389ebc2c51..d8b4ee27a7f 100644
--- a/be/src/pipeline/task_queue.h
+++ b/be/src/pipeline/task_queue.h
@@ -40,13 +40,13 @@ public:
     virtual void close() = 0;
     // Get the task by core id.
     // TODO: To think the logic is useful?
-    virtual PipelineTask* take(int core_id) = 0;
+    virtual PipelineTaskSPtr take(int core_id) = 0;
 
     // push from scheduler
-    virtual Status push_back(PipelineTask* task) = 0;
+    virtual Status push_back(PipelineTaskSPtr task) = 0;
 
     // push from worker
-    virtual Status push_back(PipelineTask* task, int core_id) = 0;
+    virtual Status push_back(PipelineTaskSPtr task, int core_id) = 0;
 
     virtual void update_statistics(PipelineTask* task, int64_t time_spent) {}
 
@@ -61,9 +61,9 @@ class SubTaskQueue {
     friend class PriorityTaskQueue;
 
 public:
-    void push_back(PipelineTask* task) { _queue.emplace(task); }
+    void push_back(PipelineTaskSPtr task) { _queue.emplace(task); }
 
-    PipelineTask* try_take(bool is_steal);
+    PipelineTaskSPtr try_take(bool is_steal);
 
     void set_level_factor(double level_factor) { _level_factor = level_factor; 
}
 
@@ -79,7 +79,7 @@ public:
     bool empty() { return _queue.empty(); }
 
 private:
-    std::queue<PipelineTask*> _queue;
+    std::queue<PipelineTaskSPtr> _queue;
     // depends on LEVEL_QUEUE_TIME_FACTOR
     double _level_factor = 1;
 
@@ -93,18 +93,18 @@ public:
 
     void close();
 
-    PipelineTask* try_take(bool is_steal);
+    PipelineTaskSPtr try_take(bool is_steal);
 
-    PipelineTask* take(uint32_t timeout_ms = 0);
+    PipelineTaskSPtr take(uint32_t timeout_ms = 0);
 
-    Status push(PipelineTask* task);
+    Status push(PipelineTaskSPtr task);
 
     void inc_sub_queue_runtime(int level, uint64_t runtime) {
         _sub_queues[level].inc_runtime(runtime);
     }
 
 private:
-    PipelineTask* _try_take_unprotected(bool is_steal);
+    PipelineTaskSPtr _try_take_unprotected(bool is_steal);
     static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 2;
     static constexpr size_t SUB_QUEUE_LEVEL = 6;
     SubTaskQueue _sub_queues[SUB_QUEUE_LEVEL];
@@ -133,17 +133,17 @@ public:
     void close() override;
 
     // Get the task by core id.
-    PipelineTask* take(int core_id) override;
+    PipelineTaskSPtr take(int core_id) override;
 
     // TODO combine these methods to `push_back(task, core_id = -1)`
-    Status push_back(PipelineTask* task) override;
+    Status push_back(PipelineTaskSPtr task) override;
 
-    Status push_back(PipelineTask* task, int core_id) override;
+    Status push_back(PipelineTaskSPtr task, int core_id) override;
 
     void update_statistics(PipelineTask* task, int64_t time_spent) override;
 
 private:
-    PipelineTask* _steal_take(
+    PipelineTaskSPtr _steal_take(
             int core_id, std::vector<std::unique_ptr<PriorityTaskQueue>>& 
prio_task_queue_list);
 
     std::shared_ptr<std::vector<std::unique_ptr<PriorityTaskQueue>>> 
_prio_task_queue_list;
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 1fb3fbb3c36..399bf416aec 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -66,19 +66,16 @@ Status TaskScheduler::start() {
     return Status::OK();
 }
 
-Status TaskScheduler::schedule_task(PipelineTask* task) {
+Status TaskScheduler::schedule_task(PipelineTaskSPtr task) {
     return _task_queue->push_back(task);
 }
 
 // after _close_task, task maybe destructed.
-void _close_task(PipelineTask* task, Status exec_status) {
+void _close_task(PipelineTask* task, Status exec_status, 
PipelineFragmentContext* ctx) {
     // Has to attach memory tracker here, because the close task will also 
release some memory.
     // Should count the memory to the query or the query's memory will not 
decrease when part of
     // task finished.
     SCOPED_ATTACH_TASK(task->runtime_state());
-    // close_a_pipeline may delete fragment context and will core in some defer
-    // code, because the defer code will access fragment context it self.
-    auto lock_for_context = task->fragment_context()->shared_from_this();
     // is_pending_finish does not check status, so has to check status in 
close API.
     // For example, in async writer, the writer may failed during dealing with 
eos_block
     // but it does not return error status. Has to check the error status in 
close API.
@@ -86,16 +83,16 @@ void _close_task(PipelineTask* task, Status exec_status) {
     // for pending finish now. So that could call close directly.
     Status status = task->close(exec_status);
     if (!status.ok()) {
-        task->fragment_context()->cancel(status);
+        ctx->cancel(status);
     }
     task->finalize();
     task->set_running(false);
-    task->fragment_context()->close_a_pipeline(task->pipeline_id());
+    ctx->close_a_pipeline(task->pipeline_id());
 }
 
 void TaskScheduler::_do_work(size_t index) {
     while (_markers[index]) {
-        auto* task = _task_queue->take(index);
+        auto task = _task_queue->take(index);
         if (!task) {
             continue;
         }
@@ -106,11 +103,15 @@ void TaskScheduler::_do_work(size_t index) {
         if (task->is_finalized()) {
             continue;
         }
+        auto fragment_context = task->fragment_context().lock();
+        if (!fragment_context) {
+            // Fragment already finishedquery
+            continue;
+        }
         task->log_detail_if_need();
         task->set_running(true);
         task->set_task_queue(_task_queue.get());
-        auto* fragment_ctx = task->fragment_context();
-        bool canceled = fragment_ctx->is_canceled();
+        bool canceled = fragment_context->is_canceled();
 
         // If the state is PENDING_FINISH, then the task is come from blocked 
queue, its is_pending_finish
         // has to return false. The task is finished and need to close now.
@@ -121,7 +122,8 @@ void TaskScheduler::_do_work(size_t index) {
             // If pipeline is canceled, it will report after pipeline closed, 
and will propagate
             // errors to downstream through exchange. So, here we needn't 
send_report.
             // fragment_ctx->send_report(true);
-            _close_task(task, fragment_ctx->get_query_ctx()->exec_status());
+            _close_task(task.get(), 
fragment_context->get_query_ctx()->exec_status(),
+                        fragment_context.get());
             continue;
         }
 
@@ -137,7 +139,7 @@ void TaskScheduler::_do_work(size_t index) {
         ASSIGN_STATUS_IF_CATCH_EXCEPTION(
                 //TODO: use a better enclose to abstracting these
                 if 
(ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
-                    TUniqueId query_id = task->query_context()->query_id();
+                    TUniqueId query_id = fragment_context->get_query_id();
                     std::string task_name = task->task_name();
 
                     std::thread::id tid = std::this_thread::get_id();
@@ -160,14 +162,14 @@ void TaskScheduler::_do_work(size_t index) {
             // LOG(WARNING)<< "task:\n"<<task->debug_string();
 
             // exec failed,cancel all fragment instance
-            fragment_ctx->cancel(status);
+            fragment_context->cancel(status);
             LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {} 
reason: {}",
-                                        
print_id(task->query_context()->query_id()),
+                                        
print_id(fragment_context->get_query_ctx()->query_id()),
                                         status.to_string());
-            _close_task(task, status);
+            _close_task(task.get(), status, fragment_context.get());
             continue;
         }
-        fragment_ctx->trigger_report_if_necessary();
+        fragment_context->trigger_report_if_necessary();
 
         if (eos) {
             // is pending finish will add the task to dependency's blocking 
queue, and then the task will be
@@ -176,8 +178,8 @@ void TaskScheduler::_do_work(size_t index) {
                 // Only meet eos, should set task to PENDING_FINISH state
                 task->set_running(false);
             } else {
-                Status exec_status = 
fragment_ctx->get_query_ctx()->exec_status();
-                _close_task(task, exec_status);
+                Status exec_status = 
fragment_context->get_query_ctx()->exec_status();
+                _close_task(task.get(), exec_status, fragment_context.get());
             }
             continue;
         }
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index 9a20807ea26..0307d0603ec 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -55,7 +55,7 @@ public:
 
     ~TaskScheduler();
 
-    Status schedule_task(PipelineTask* task);
+    Status schedule_task(PipelineTaskSPtr task);
 
     Status start();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to