This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new c58b3e934da branch-3.1: (memtracker) memory not consumed by memtracker 
#55796 (#55823)
c58b3e934da is described below

commit c58b3e934daabc1262fbff26df74859d716e8d5d
Author: yiguolei <[email protected]>
AuthorDate: Thu Sep 11 10:46:11 2025 +0800

    branch-3.1: (memtracker) memory not consumed by memtracker #55796 (#55823)
    
    picked from #55796
---
 be/src/pipeline/pipeline_task.cpp | 19 ++++++++++++++++++-
 be/src/pipeline/pipeline_task.h   |  3 +++
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index dfda613af52..04f869007da 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -74,13 +74,30 @@ PipelineTask::PipelineTask(
           _task_idx(task_idx),
           _pipeline_name(_pipeline->name()) {
     _pipeline_task_watcher.start();
+#ifndef BE_TEST
+    _query_mem_tracker = fragment_context->get_query_ctx()->query_mem_tracker;
+#endif
     
_execution_dependencies.push_back(state->get_query_ctx()->get_execution_dependency());
     auto shared_state = _sink->create_shared_state();
     if (shared_state) {
         _sink_shared_state = shared_state;
     }
 }
-
+PipelineTask::~PipelineTask() {
+// PipelineTask is also hold by task queue( 
https://github.com/apache/doris/pull/49753),
+// so that it maybe the last one to be destructed.
+// But pipeline task hold some objects, like operators, shared state, etc. So 
that should release
+// memory manually.
+#ifndef BE_TEST
+    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_mem_tracker);
+#endif
+    _sink_shared_state.reset();
+    _op_shared_states.clear();
+    _sink.reset();
+    _operators.clear();
+    _block.reset();
+    _pipeline.reset();
+}
 Status PipelineTask::prepare(const TPipelineInstanceParams& local_params, 
const TDataSink& tsink,
                              QueryContext* query_ctx) {
     DCHECK(_sink);
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 3a50436280d..645879b7043 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -54,6 +54,7 @@ public:
                                          std::shared_ptr<Dependency>>>
                          le_state_map,
                  int task_idx);
+    ~PipelineTask();
 
     Status prepare(const TPipelineInstanceParams& local_params, const 
TDataSink& tsink,
                    QueryContext* query_ctx);
@@ -316,6 +317,8 @@ private:
     std::atomic<bool> _eos = false;
     std::atomic<bool> _wake_up_early = false;
     const std::string _pipeline_name;
+    // PipelineTask maybe hold by TaskQueue
+    std::shared_ptr<MemTrackerLimiter> _query_mem_tracker;
 };
 
 using PipelineTaskSPtr = std::shared_ptr<PipelineTask>;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to