This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 97fcad76f8 [enhancement](memtracker) Improve readability (#15716)
97fcad76f8 is described below

commit 97fcad76f83adccc43eaff995bf1d570ded30605
Author: Xinyi Zou <[email protected]>
AuthorDate: Mon Jan 16 16:30:35 2023 +0800

    [enhancement](memtracker) Improve readability (#15716)
---
 be/src/exec/exec_node.cpp                       | 15 ++-------------
 be/src/exec/exec_node.h                         | 12 ++++--------
 be/src/http/default_path_handlers.cpp           |  6 ++----
 be/src/olap/delta_writer.cpp                    |  4 ++--
 be/src/olap/memtable.cpp                        |  4 ++--
 be/src/olap/olap_server.cpp                     | 10 ++--------
 be/src/olap/push_handler.cpp                    |  4 ++--
 be/src/olap/storage_engine.cpp                  |  7 ++-----
 be/src/olap/storage_engine.h                    |  2 --
 be/src/olap/tablet_manager.cpp                  | 11 ++++++++++-
 be/src/olap/tablet_manager.h                    |  1 +
 be/src/olap/task/engine_batch_load_task.cpp     |  2 +-
 be/src/olap/task/engine_checksum_task.cpp       |  2 +-
 be/src/pipeline/exec/operator.h                 | 10 ++++++----
 be/src/pipeline/pipeline_fragment_context.cpp   |  1 -
 be/src/runtime/exec_env.h                       |  7 +++----
 be/src/runtime/exec_env_init.cpp                | 12 +++++++++---
 be/src/runtime/load_channel_mgr.cpp             |  7 +++----
 be/src/runtime/load_channel_mgr.h               | 14 +++++++-------
 be/src/runtime/memory/mem_tracker.cpp           | 14 +++++++++++---
 be/src/runtime/memory/mem_tracker.h             | 14 +++++++-------
 be/src/runtime/memory/mem_tracker_limiter.cpp   | 21 +++++++++++++--------
 be/src/runtime/memory/mem_tracker_limiter.h     | 15 ++++++---------
 be/src/runtime/plan_fragment_executor.cpp       |  1 -
 be/src/runtime/runtime_filter_mgr.cpp           |  6 ++++--
 be/src/runtime/runtime_state.cpp                |  2 --
 be/src/runtime/runtime_state.h                  |  8 --------
 be/src/vec/exec/join/vhash_join_node.cpp        |  2 --
 be/src/vec/exec/join/vjoin_node_base.cpp        |  2 --
 be/src/vec/exec/join/vnested_loop_join_node.cpp |  3 ---
 be/src/vec/exec/scan/new_es_scan_node.cpp       |  1 -
 be/src/vec/exec/scan/new_jdbc_scan_node.cpp     |  1 -
 be/src/vec/exec/scan/new_odbc_scan_node.cpp     |  1 -
 be/src/vec/exec/scan/new_olap_scan_node.cpp     |  1 -
 be/src/vec/exec/scan/scanner_scheduler.cpp      |  1 -
 be/src/vec/exec/scan/vscan_node.cpp             |  4 ----
 be/src/vec/exec/vaggregation_node.cpp           | 14 +++++---------
 be/src/vec/exec/vanalytic_eval_node.cpp         | 16 ++--------------
 be/src/vec/exec/vbroker_scan_node.cpp           |  4 ----
 be/src/vec/exec/vexchange_node.cpp              |  3 ---
 be/src/vec/exec/vmysql_scan_node.cpp            |  2 --
 be/src/vec/exec/vrepeat_node.cpp                |  2 --
 be/src/vec/exec/vschema_scan_node.cpp           |  2 --
 be/src/vec/exec/vset_operation_node.cpp         |  2 --
 be/src/vec/exec/vsort_node.cpp                  |  6 +-----
 be/src/vec/exec/vtable_function_node.cpp        |  1 -
 be/src/vec/runtime/vdata_stream_recvr.cpp       |  5 +++--
 be/src/vec/sink/vdata_stream_sender.cpp         |  3 ++-
 be/test/testutil/run_all_tests.cpp              |  5 +----
 49 files changed, 114 insertions(+), 179 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 9bd0c5c494..e0c85d9816 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -142,18 +142,8 @@ Status ExecNode::prepare(RuntimeState* state) {
             std::bind<int64_t>(&RuntimeProfile::units_per_second, 
_rows_returned_counter,
                                runtime_profile()->total_time_counter()),
             "");
-    _mem_tracker_held =
-            std::make_unique<MemTracker>("ExecNode:" + 
_runtime_profile->name(),
-                                         _runtime_profile.get(), nullptr, 
"PeakMemoryUsage");
-    // Only when the query profile is enabled, the node allocated memory will 
be track through the mem hook,
-    // otherwise _mem_tracker_growh is nullptr, and SCOPED_CONSUME_MEM_TRACKER 
will do nothing.
-    if (state->query_options().__isset.is_report_success &&
-        state->query_options().is_report_success) {
-        _mem_tracker_growh = std::make_shared<MemTracker>(
-                "ExecNode:MemoryOnlyTrackAlloc:" + _runtime_profile->name(), 
_runtime_profile.get(),
-                nullptr, "MemoryOnlyTrackAllocNoConsiderFree", true);
-    }
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
+    _mem_tracker = std::make_unique<MemTracker>("ExecNode:" + 
_runtime_profile->name(),
+                                                _runtime_profile.get(), 
nullptr, "PeakMemoryUsage");
 
     if (_vconjunct_ctx_ptr) {
         RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->prepare(state, 
intermediate_row_desc()));
@@ -176,7 +166,6 @@ Status ExecNode::prepare(RuntimeState* state) {
 }
 
 Status ExecNode::alloc_resource(doris::RuntimeState* state) {
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     if (_vconjunct_ctx_ptr) {
         RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->open(state));
     }
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index 353db077f3..76ab99fd42 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -229,9 +229,7 @@ public:
     RuntimeProfile* runtime_profile() const { return _runtime_profile.get(); }
     RuntimeProfile::Counter* memory_used_counter() const { return 
_memory_used_counter; }
 
-    MemTracker* mem_tracker_held() const { return _mem_tracker_held.get(); }
-    MemTracker* mem_tracker_growh() const { return _mem_tracker_growh.get(); }
-    std::shared_ptr<MemTracker> mem_tracker_growh_shared() const { return 
_mem_tracker_growh; }
+    MemTracker* mem_tracker() const { return _mem_tracker.get(); }
 
     OpentelemetrySpan get_next_span() { return _get_next_span; }
 
@@ -279,11 +277,9 @@ protected:
 
     std::unique_ptr<RuntimeProfile> _runtime_profile;
 
-    // Record the memory size held by this node.
-    std::unique_ptr<MemTracker> _mem_tracker_held;
-    // Record the memory size allocated by this node.
-    // Similar to tcmalloc heap profile growh, only track memory alloc, not 
track memory free.
-    std::shared_ptr<MemTracker> _mem_tracker_growh;
+    // Record this node memory size. it is expected that artificial guarantees 
are accurate,
+    // which will providea reference for operator memory.
+    std::unique_ptr<MemTracker> _mem_tracker;
 
     RuntimeProfile::Counter* _rows_returned_counter;
     RuntimeProfile::Counter* _rows_returned_rate;
diff --git a/be/src/http/default_path_handlers.cpp 
b/be/src/http/default_path_handlers.cpp
index 42a29d21e9..e0ddfbbb37 100644
--- a/be/src/http/default_path_handlers.cpp
+++ b/be/src/http/default_path_handlers.cpp
@@ -147,11 +147,9 @@ void mem_tracker_handler(const 
WebPageHandler::ArgumentMap& args, std::stringstr
                                                    
MemTrackerLimiter::Type::SCHEMA_CHANGE);
         } else if (iter->second == "clone") {
             MemTrackerLimiter::make_type_snapshots(&snapshots, 
MemTrackerLimiter::Type::CLONE);
-        } else if (iter->second == "batch_load") {
-            MemTrackerLimiter::make_type_snapshots(&snapshots, 
MemTrackerLimiter::Type::BATCHLOAD);
-        } else if (iter->second == "consistency") {
+        } else if (iter->second == "experimental") {
             MemTrackerLimiter::make_type_snapshots(&snapshots,
-                                                   
MemTrackerLimiter::Type::CONSISTENCY);
+                                                   
MemTrackerLimiter::Type::EXPERIMENTAL);
         }
     } else {
         (*output) << "<h4>*Note: (see documentation for details)</h4>\n";
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index c001b67cad..101b46bf6a 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -245,11 +245,11 @@ void DeltaWriter::_reset_mem_table() {
     auto mem_table_insert_tracker = std::make_shared<MemTracker>(
             
fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
                         std::to_string(tablet_id()), _mem_table_num, 
_load_id.to_string()),
-            nullptr, 
ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker_set());
+            ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
     auto mem_table_flush_tracker = std::make_shared<MemTracker>(
             
fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}",
                         std::to_string(tablet_id()), _mem_table_num++, 
_load_id.to_string()),
-            nullptr, 
ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker_set());
+            ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
 #else
     auto mem_table_insert_tracker = std::make_shared<MemTracker>(
             
fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index ea1188e874..53d785f2c5 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -57,8 +57,8 @@ MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, 
const TabletSchema* t
           _cur_max_version(cur_max_version) {
 #ifndef BE_TEST
     _insert_mem_tracker_use_hook = std::make_unique<MemTracker>(
-            fmt::format("MemTableHookInsert:TabletId={}", 
std::to_string(tablet_id())), nullptr,
-            ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker_set());
+            fmt::format("MemTableHookInsert:TabletId={}", 
std::to_string(tablet_id())),
+            ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
 #else
     _insert_mem_tracker_use_hook = std::make_unique<MemTracker>(
             fmt::format("MemTableHookInsert:TabletId={}", 
std::to_string(tablet_id())));
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 2d5d12c860..7ab1bf1a97 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -120,20 +120,14 @@ Status StorageEngine::start_bg_threads() {
             scoped_refptr<Thread> path_scan_thread;
             RETURN_IF_ERROR(Thread::create(
                     "StorageEngine", "path_scan_thread",
-                    [this, data_dir]() {
-                        SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
-                        this->_path_scan_thread_callback(data_dir);
-                    },
+                    [this, data_dir]() { 
this->_path_scan_thread_callback(data_dir); },
                     &path_scan_thread));
             _path_scan_threads.emplace_back(path_scan_thread);
 
             scoped_refptr<Thread> path_gc_thread;
             RETURN_IF_ERROR(Thread::create(
                     "StorageEngine", "path_gc_thread",
-                    [this, data_dir]() {
-                        SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
-                        this->_path_gc_thread_callback(data_dir);
-                    },
+                    [this, data_dir]() { 
this->_path_gc_thread_callback(data_dir); },
                     &path_gc_thread));
             _path_gc_threads.emplace_back(path_gc_thread);
         }
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index ad3bb25e79..49b01f6418 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -833,8 +833,8 @@ Status PushBrokerReader::init(const Schema* schema, const 
TBrokerScanRange& t_sc
     }
     _runtime_profile = _runtime_state->runtime_profile();
     _runtime_profile->set_name("PushBrokerReader");
-    _mem_pool.reset(new MemPool(_runtime_state->scanner_mem_tracker().get()));
-    _tuple_buffer_pool.reset(new 
MemPool(_runtime_state->scanner_mem_tracker().get()));
+    _mem_pool.reset(new MemPool());
+    _tuple_buffer_pool.reset(new MemPool());
 
     _counter.reset(new ScannerCounter());
 
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index ffa137c26f..cea5856dbb 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -108,7 +108,6 @@ StorageEngine::StorageEngine(const EngineOptions& options)
           _effective_cluster_id(-1),
           _is_all_cluster_id_exist(true),
           _stopped(false),
-          _mem_tracker(std::make_shared<MemTracker>("StorageEngine")),
           
_segcompaction_mem_tracker(std::make_shared<MemTracker>("SegCompaction")),
           
_segment_meta_mem_tracker(std::make_shared<MemTracker>("SegmentMeta")),
           _stop_background_threads_latch(1),
@@ -149,8 +148,7 @@ StorageEngine::~StorageEngine() {
 void StorageEngine::load_data_dirs(const std::vector<DataDir*>& data_dirs) {
     std::vector<std::thread> threads;
     for (auto data_dir : data_dirs) {
-        threads.emplace_back([this, data_dir] {
-            SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+        threads.emplace_back([data_dir] {
             auto res = data_dir->load();
             if (!res.ok()) {
                 LOG(WARNING) << "io error when init load tables. res=" << res
@@ -195,8 +193,7 @@ Status StorageEngine::_init_store_map() {
         DataDir* store = new DataDir(path.path, path.capacity_bytes, 
path.storage_medium,
                                      _tablet_manager.get(), 
_txn_manager.get());
         tmp_stores.emplace_back(store);
-        threads.emplace_back([this, store, &error_msg_lock, &error_msg]() {
-            SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+        threads.emplace_back([store, &error_msg_lock, &error_msg]() {
             auto st = store->init();
             if (!st.ok()) {
                 {
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 910c926c69..dcaa78ba56 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -322,8 +322,6 @@ private:
     // map<rowset_id(str), RowsetSharedPtr>, if we use RowsetId as the key, we 
need custom hash func
     std::unordered_map<std::string, RowsetSharedPtr> _unused_rowsets;
 
-    // StorageEngine oneself
-    std::shared_ptr<MemTracker> _mem_tracker;
     // Count the memory consumption of segment compaction tasks.
     std::shared_ptr<MemTracker> _segcompaction_mem_tracker;
     // This mem tracker is only for tracking memory use by segment meta data 
such as footer or index page.
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 03d6b0f4d1..9d8633dabc 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -74,7 +74,9 @@ 
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(tablet_meta_mem_consumption, MetricUnit::BYTE
                                    mem_consumption, Labels({{"type", 
"tablet_meta"}}));
 
 TabletManager::TabletManager(int32_t tablet_map_lock_shard_size)
-        : _mem_tracker(std::make_shared<MemTracker>("TabletManager")),
+        : _mem_tracker(std::make_shared<MemTracker>(
+                  "TabletManager", 
ExecEnv::GetInstance()->experimental_mem_tracker())),
+          _tablet_meta_mem_tracker(std::make_shared<MemTracker>("TabletMeta")),
           _tablets_shards_size(tablet_map_lock_shard_size),
           _tablets_shards_mask(tablet_map_lock_shard_size - 1) {
     CHECK_GT(_tablets_shards_size, 0);
@@ -207,6 +209,10 @@ Status 
TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id,
     tablet_map_t& tablet_map = _get_tablet_map(tablet_id);
     tablet_map[tablet_id] = tablet;
     _add_tablet_to_partition(tablet);
+    // TODO: remove multiply 2 of tablet meta mem size
+    // Because table schema will copy in tablet, there will be double mem cost
+    // so here multiply 2
+    _tablet_meta_mem_tracker->consume(tablet->tablet_meta()->mem_size() * 2);
 
     VLOG_NOTICE << "add tablet to map successfully."
                 << " tablet_id=" << tablet_id;
@@ -489,6 +495,7 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId 
tablet_id, TReplicaId repl
     }
 
     to_drop_tablet->deregister_tablet_from_dir();
+    
_tablet_meta_mem_tracker->release(to_drop_tablet->tablet_meta()->mem_size() * 
2);
     return Status::OK();
 }
 
@@ -718,6 +725,7 @@ Status TabletManager::load_tablet_from_meta(DataDir* 
data_dir, TTabletId tablet_
                                             TSchemaHash schema_hash, const 
string& meta_binary,
                                             bool update_meta, bool force, bool 
restore,
                                             bool check_path) {
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
     TabletMetaSharedPtr tablet_meta(new TabletMeta());
     Status status = tablet_meta->deserialize(meta_binary);
     if (!status.ok()) {
@@ -800,6 +808,7 @@ Status TabletManager::load_tablet_from_meta(DataDir* 
data_dir, TTabletId tablet_
 Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id,
                                            SchemaHash schema_hash, const 
string& schema_hash_path,
                                            bool force, bool restore) {
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
     LOG(INFO) << "begin to load tablet from dir. "
               << " tablet_id=" << tablet_id << " schema_hash=" << schema_hash
               << " path = " << schema_hash_path << " force = " << force << " 
restore = " << restore;
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 58a60cbee7..30a64937cc 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -202,6 +202,7 @@ private:
 
     // trace the memory use by meta of tablet
     std::shared_ptr<MemTracker> _mem_tracker;
+    std::shared_ptr<MemTracker> _tablet_meta_mem_tracker;
 
     const int32_t _tablets_shards_size;
     const int32_t _tablets_shards_mask;
diff --git a/be/src/olap/task/engine_batch_load_task.cpp 
b/be/src/olap/task/engine_batch_load_task.cpp
index bd3abf3efc..a9135df050 100644
--- a/be/src/olap/task/engine_batch_load_task.cpp
+++ b/be/src/olap/task/engine_batch_load_task.cpp
@@ -50,7 +50,7 @@ using namespace ErrorCode;
 EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, 
std::vector<TTabletInfo>* tablet_infos)
         : _push_req(push_req), _tablet_infos(tablet_infos) {
     _mem_tracker = std::make_shared<MemTrackerLimiter>(
-            MemTrackerLimiter::Type::BATCHLOAD,
+            MemTrackerLimiter::Type::LOAD,
             fmt::format("EngineBatchLoadTask#pushType={}:tabletId={}", 
_push_req.push_type,
                         std::to_string(_push_req.tablet_id)));
 }
diff --git a/be/src/olap/task/engine_checksum_task.cpp 
b/be/src/olap/task/engine_checksum_task.cpp
index d1e8ae7eb2..c298b104c5 100644
--- a/be/src/olap/task/engine_checksum_task.cpp
+++ b/be/src/olap/task/engine_checksum_task.cpp
@@ -26,7 +26,7 @@ EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, 
TSchemaHash schema_h
                                        TVersion version, uint32_t* checksum)
         : _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version), 
_checksum(checksum) {
     _mem_tracker = std::make_shared<MemTrackerLimiter>(
-            MemTrackerLimiter::Type::CONSISTENCY,
+            MemTrackerLimiter::Type::LOAD,
             "EngineChecksumTask#tabletId=" + std::to_string(tablet_id));
 }
 
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 0b7ebfdadc..e2fde2d9e5 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -262,8 +262,9 @@ public:
         _runtime_profile.reset(new RuntimeProfile(
                 fmt::format("{} (id={})", _operator_builder->get_name(), 
_operator_builder->id())));
         _sink->profile()->insert_child_head(_runtime_profile.get(), true);
-        _mem_tracker = std::make_unique<MemTracker>("DataSinkOperator:" + 
_runtime_profile->name(),
-                                                    _runtime_profile.get());
+        _mem_tracker =
+                std::make_unique<MemTracker>("DataSinkOperator:" + 
_runtime_profile->name(),
+                                             _runtime_profile.get(), nullptr, 
"PeakMemoryUsage");
         return Status::OK();
     }
 
@@ -319,8 +320,9 @@ public:
         _runtime_profile.reset(new RuntimeProfile(
                 fmt::format("{} (id={})", _operator_builder->get_name(), 
_operator_builder->id())));
         _node->runtime_profile()->insert_child_head(_runtime_profile.get(), 
true);
-        _mem_tracker = std::make_unique<MemTracker>(get_name() + ": " + 
_runtime_profile->name(),
-                                                    _runtime_profile.get());
+        _mem_tracker =
+                std::make_unique<MemTracker>(get_name() + ": " + 
_runtime_profile->name(),
+                                             _runtime_profile.get(), nullptr, 
"PeakMemoryUsage");
         _node->increase_ref();
         return Status::OK();
     }
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index d87b47db28..52fb8d2175 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -181,7 +181,6 @@ Status PipelineFragmentContext::prepare(const 
doris::TExecPlanFragmentParams& re
 
     // TODO should be combine with plan_fragment_executor.prepare funciton
     SCOPED_ATTACH_TASK(get_runtime_state());
-    _runtime_state->init_scanner_mem_trackers();
     _runtime_state->runtime_filter_mgr()->init();
     _runtime_state->set_be_number(request.backend_num);
 
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index c47d15d9fa..f90cb98ba6 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -120,12 +120,10 @@ public:
         return nullptr;
     }
 
-    void set_orphan_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& 
orphan_tracker) {
-        _orphan_mem_tracker = orphan_tracker;
-        _orphan_mem_tracker_raw = orphan_tracker.get();
-    }
+    void init_mem_tracker();
     std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker() { return 
_orphan_mem_tracker; }
     MemTrackerLimiter* orphan_mem_tracker_raw() { return 
_orphan_mem_tracker_raw; }
+    MemTrackerLimiter* experimental_mem_tracker() { return 
_experimental_mem_tracker.get(); }
     ThreadResourceMgr* thread_mgr() { return _thread_mgr; }
     ThreadPool* send_batch_thread_pool() { return 
_send_batch_thread_pool.get(); }
     ThreadPool* download_cache_thread_pool() { return 
_download_cache_thread_pool.get(); }
@@ -215,6 +213,7 @@ private:
     // and the consumption of the orphan mem tracker is close to 0, but 
greater than 0.
     std::shared_ptr<MemTrackerLimiter> _orphan_mem_tracker;
     MemTrackerLimiter* _orphan_mem_tracker_raw;
+    std::shared_ptr<MemTrackerLimiter> _experimental_mem_tracker;
 
     std::unique_ptr<ThreadPool> _send_batch_thread_pool;
 
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 22fa7f9123..d557e8e0ef 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -166,9 +166,7 @@ Status ExecEnv::_init_mem_env() {
     bool is_percent = false;
     std::stringstream ss;
     // 1. init mem tracker
-    _orphan_mem_tracker =
-            
std::make_shared<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, "Orphan");
-    _orphan_mem_tracker_raw = _orphan_mem_tracker.get();
+    init_mem_tracker();
     thread_context()->thread_mem_tracker_mgr->init();
 #if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && 
!defined(ADDRESS_SANITIZER) && \
         !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && 
!defined(USE_JEMALLOC)
@@ -236,6 +234,14 @@ Status ExecEnv::_init_mem_env() {
     return Status::OK();
 }
 
+void ExecEnv::init_mem_tracker() {
+    _orphan_mem_tracker =
+            
std::make_shared<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, "Orphan");
+    _orphan_mem_tracker_raw = _orphan_mem_tracker.get();
+    _experimental_mem_tracker = std::make_shared<MemTrackerLimiter>(
+            MemTrackerLimiter::Type::EXPERIMENTAL, "ExperimentalSet");
+}
+
 void ExecEnv::init_download_cache_buf() {
     std::unique_ptr<char[]> download_cache_buf(new 
char[config::download_cache_buffer_size]);
     memset(download_cache_buf.get(), 0, config::download_cache_buffer_size);
diff --git a/be/src/runtime/load_channel_mgr.cpp 
b/be/src/runtime/load_channel_mgr.cpp
index 8d26c47e02..bd00718472 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -75,9 +75,8 @@ LoadChannelMgr::~LoadChannelMgr() {
 Status LoadChannelMgr::init(int64_t process_mem_limit) {
     _load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit);
     _load_soft_mem_limit = _load_hard_mem_limit * 
config::load_process_soft_mem_limit_percent / 100;
-    _mem_tracker = std::make_unique<MemTracker>("LoadChannelMgr");
-    _mem_tracker_set = 
std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::LOAD,
-                                                           
"LoadChannelMgrTrackerSet");
+    _mem_tracker =
+            std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::LOAD, 
"LoadChannelMgr");
     REGISTER_HOOK_METRIC(load_channel_mem_consumption,
                          [this]() { return _mem_tracker->consumption(); });
     _last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024);
@@ -105,7 +104,7 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& 
params) {
             auto channel_mem_tracker = std::make_unique<MemTracker>(
                     fmt::format("LoadChannel#senderIp={}#loadID={}", 
params.sender_ip(),
                                 load_id.to_string()),
-                    nullptr, 
ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker_set());
+                    ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
 #else
             auto channel_mem_tracker = 
std::make_unique<MemTracker>(fmt::format(
                     "LoadChannel#senderIp={}#loadID={}", params.sender_ip(), 
load_id.to_string()));
diff --git a/be/src/runtime/load_channel_mgr.h 
b/be/src/runtime/load_channel_mgr.h
index 967617e9bc..a825b74bca 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -58,7 +58,7 @@ public:
         std::lock_guard<std::mutex> l(_lock);
         _refresh_mem_tracker_without_lock();
     }
-    MemTrackerLimiter* mem_tracker_set() { return _mem_tracker_set.get(); }
+    MemTrackerLimiter* mem_tracker() { return _mem_tracker.get(); }
 
 private:
     template <typename Request>
@@ -74,11 +74,12 @@ private:
 
     // lock should be held when calling this method
     void _refresh_mem_tracker_without_lock() {
-        int64_t mem_usage = 0;
+        _mem_usage = 0;
         for (auto& kv : _load_channels) {
-            mem_usage += kv.second->mem_consumption();
+            _mem_usage += kv.second->mem_consumption();
         }
-        _mem_tracker->set_consumption(mem_usage);
+        THREAD_MEM_TRACKER_TRANSFER_TO(_mem_usage - 
_mem_tracker->consumption(),
+                                       _mem_tracker.get());
     }
 
 protected:
@@ -89,9 +90,8 @@ protected:
     Cache* _last_success_channel = nullptr;
 
     // check the total load channel mem consumption of this Backend
-    std::unique_ptr<MemTracker> _mem_tracker;
-    // Associate load channel tracker and memtable tracker, avoid default 
association to Orphan tracker.
-    std::unique_ptr<MemTrackerLimiter> _mem_tracker_set;
+    int64_t _mem_usage = 0;
+    std::unique_ptr<MemTrackerLimiter> _mem_tracker;
     int64_t _load_hard_mem_limit = -1;
     int64_t _load_soft_mem_limit = -1;
     bool _soft_reduce_mem_in_progress = false;
diff --git a/be/src/runtime/memory/mem_tracker.cpp 
b/be/src/runtime/memory/mem_tracker.cpp
index 7d1c3de7c1..1888f366a5 100644
--- a/be/src/runtime/memory/mem_tracker.cpp
+++ b/be/src/runtime/memory/mem_tracker.cpp
@@ -40,8 +40,8 @@ struct TrackerGroup {
 static std::vector<TrackerGroup> mem_tracker_pool(1000);
 
 MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile, 
MemTrackerLimiter* parent,
-                       const std::string& profile_counter_name, bool 
only_track_alloc)
-        : _label(label), _only_track_alloc(only_track_alloc) {
+                       const std::string& profile_counter_name)
+        : _label(label) {
     if (profile == nullptr) {
         _consumption = 
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES);
     } else {
@@ -56,7 +56,15 @@ MemTracker::MemTracker(const std::string& label, 
RuntimeProfile* profile, MemTra
         // release().
         _consumption = 
profile->AddSharedHighWaterMarkCounter(profile_counter_name, TUnit::BYTES);
     }
+    bind_parent(parent);
+}
+
+MemTracker::MemTracker(const std::string& label, MemTrackerLimiter* parent) : 
_label(label) {
+    _consumption = 
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES);
+    bind_parent(parent);
+}
 
+void MemTracker::bind_parent(MemTrackerLimiter* parent) {
     if (parent) {
         _parent_label = parent->label();
         _parent_group_num = parent->group_num();
@@ -95,7 +103,7 @@ void 
MemTracker::make_group_snapshot(std::vector<MemTracker::Snapshot>* snapshot
                                      int64_t group_num, std::string 
parent_label) {
     std::lock_guard<std::mutex> l(mem_tracker_pool[group_num].group_lock);
     for (auto tracker : mem_tracker_pool[group_num].trackers) {
-        if (tracker->parent_label() == parent_label) {
+        if (tracker->parent_label() == parent_label && tracker->consumption() 
!= 0) {
             snapshots->push_back(tracker->make_snapshot());
         }
     }
diff --git a/be/src/runtime/memory/mem_tracker.h 
b/be/src/runtime/memory/mem_tracker.h
index 46d1ec24d4..509362397a 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -44,10 +44,9 @@ public:
     };
 
     // Creates and adds the tracker to the mem_tracker_pool.
-    MemTracker(const std::string& label, RuntimeProfile* profile = nullptr,
-               MemTrackerLimiter* parent = nullptr,
-               const std::string& profile_counter_name = "PeakMemoryUsage",
-               bool only_track_alloc = false);
+    MemTracker(const std::string& label, RuntimeProfile* profile, 
MemTrackerLimiter* parent,
+               const std::string& profile_counter_name);
+    MemTracker(const std::string& label, MemTrackerLimiter* parent = nullptr);
     // For MemTrackerLimiter
     MemTracker() { _parent_group_num = -1; }
 
@@ -61,13 +60,13 @@ public:
 public:
     const std::string& label() const { return _label; }
     const std::string& parent_label() const { return _parent_label; }
+    const std::string& set_parent_label() const { return _parent_label; }
     // Returns the memory consumed in bytes.
     int64_t consumption() const { return _consumption->current_value(); }
     int64_t peak_consumption() const { return _consumption->value(); }
 
     void consume(int64_t bytes) {
         if (bytes == 0) return;
-        if (bytes < 0 && _only_track_alloc) return;
         _consumption->add(bytes);
     }
     void release(int64_t bytes) { consume(-bytes); }
@@ -89,6 +88,8 @@ public:
     }
 
 protected:
+    void bind_parent(MemTrackerLimiter* parent);
+
     // label used in the make snapshot, not guaranteed unique.
     std::string _label;
 
@@ -96,10 +97,9 @@ protected:
 
     // Tracker is located in group num in mem_tracker_pool
     int64_t _parent_group_num = 0;
+    // Use _parent_label to correlate with parent limiter tracker.
     std::string _parent_label = "-";
 
-    bool _only_track_alloc = false;
-
     // Iterator into mem_tracker_pool for this object. Stored to have O(1) 
remove.
     std::list<MemTracker*>::iterator _tracker_group_it;
 };
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 05e53baaf9..87ef52fbed 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -97,9 +97,8 @@ MemTracker::Snapshot MemTrackerLimiter::make_snapshot() const 
{
 
 void MemTrackerLimiter::refresh_global_counter() {
     std::unordered_map<Type, int64_t> type_mem_sum = {
-            {Type::GLOBAL, 0},     {Type::QUERY, 0},         {Type::LOAD, 0},
-            {Type::COMPACTION, 0}, {Type::SCHEMA_CHANGE, 0}, {Type::CLONE, 0},
-            {Type::BATCHLOAD, 0},  {Type::CONSISTENCY, 0}};
+            {Type::GLOBAL, 0},        {Type::QUERY, 0}, {Type::LOAD, 0}, 
{Type::COMPACTION, 0},
+            {Type::SCHEMA_CHANGE, 0}, {Type::CLONE, 0}}; // No need refresh 
Type::EXPERIMENTAL
     for (unsigned i = 0; i < mem_tracker_limiter_pool.size(); ++i) {
         std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
         for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
@@ -327,6 +326,11 @@ int64_t 
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem) {
         }
     }
 
+    // Minor gc does not cancel when there is only one query. full gc conver.
+    if (query_consumption.size() <= 1) {
+        return 0;
+    }
+
     std::priority_queue<std::pair<int64_t, std::string>> max_pq;
     // Min-heap to Max-heap.
     while (!min_pq.empty()) {
@@ -341,15 +345,16 @@ int64_t 
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem) {
         int64_t query_mem = query_consumption[max_pq.top().second];
         ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
                 cancelled_queryid, 
PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
-                fmt::format("Process has no memory available, cancel top 
memory usage query: "
+                fmt::format("Process has less memory, cancel top memory 
overcommit query: "
                             "query memory tracker <{}> consumption {}, backend 
{} "
-                            "process memory used {} exceed limit {} or sys mem 
available {} "
-                            "less than low water mark {}. Execute again after 
enough memory, "
+                            "process memory used {} exceed soft limit {} or 
sys mem available {} "
+                            "less than warning water mark {}. Execute again 
after enough memory, "
                             "details see be.INFO.",
                             max_pq.top().second, print_bytes(query_mem),
                             BackendOptions::get_localhost(), 
PerfCounters::get_vm_rss_str(),
-                            MemInfo::mem_limit_str(), 
MemInfo::sys_mem_available_str(),
-                            
print_bytes(MemInfo::sys_mem_available_low_water_mark())));
+                            print_bytes(MemInfo::soft_mem_limit()),
+                            MemInfo::sys_mem_available_str(),
+                            
print_bytes(MemInfo::sys_mem_available_warning_water_mark())));
 
         usage_strings.push_back(fmt::format("{} memory usage {} Bytes, 
overcommit ratio: {}",
                                             max_pq.top().second, query_mem, 
max_pq.top().first));
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index 7696908dab..bb346a2a3b 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -46,8 +46,8 @@ public:
         COMPACTION = 3,    // Count the memory consumption of all Base and 
Cumulative tasks.
         SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange 
tasks.
         CLONE = 5, // Count the memory consumption of all EngineCloneTask. 
Note: Memory that does not contain make/release snapshots.
-        BATCHLOAD = 6,  // Count the memory consumption of all 
EngineBatchLoadTask.
-        CONSISTENCY = 7 // Count the memory consumption of all 
EngineChecksumTask.
+        EXPERIMENTAL =
+                6 // Experimental memory statistics, usually inaccurate, used 
for debugging, and expect to add other types in the future.
     };
 
     inline static std::unordered_map<Type, 
std::shared_ptr<RuntimeProfile::HighWaterMarkCounter>>
@@ -63,14 +63,11 @@ public:
                            
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)},
                           {Type::CLONE,
                            
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)},
-                          {Type::BATCHLOAD,
-                           
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)},
-                          {Type::CONSISTENCY,
+                          {Type::EXPERIMENTAL,
                            
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)}};
 
-    inline static const std::string TypeString[] = {"global",     "query",     
    "load",
-                                                    "compaction", 
"schema_change", "clone",
-                                                    "batch_load", 
"consistency"};
+    inline static const std::string TypeString[] = {
+            "global", "query", "load", "compaction", "schema_change", "clone", 
"experimental"};
 
 public:
     // byte_limit equal to -1 means no consumption limit, only participate in 
process memory statistics.
@@ -161,7 +158,7 @@ public:
 
     static std::string process_mem_log_str() {
         return fmt::format(
-                "physical memory {}, process memory used {} limit {}, sys mem 
available {} low "
+                "OS physical memory {}, process memory used {} limit {}, sys 
mem available {} low "
                 "water mark {}, refresh interval memory growth {} B",
                 PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES),
                 PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index f36cefaf51..748191b177 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -100,7 +100,6 @@ Status PlanFragmentExecutor::prepare(const 
TExecPlanFragmentParams& request,
     _runtime_state->set_tracer(std::move(tracer));
 
     SCOPED_ATTACH_TASK(_runtime_state.get());
-    _runtime_state->init_scanner_mem_trackers();
     _runtime_state->runtime_filter_mgr()->init();
     _runtime_state->set_be_number(request.backend_num);
     if (request.__isset.backend_id) {
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index bf0bfd0d9d..4d5478a596 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -45,7 +45,8 @@ RuntimeFilterMgr::~RuntimeFilterMgr() {}
 
 Status RuntimeFilterMgr::init() {
     DCHECK(_state->query_mem_tracker() != nullptr);
-    _tracker = std::make_unique<MemTracker>("RuntimeFilterMgr");
+    _tracker = std::make_unique<MemTracker>("RuntimeFilterMgr",
+                                            
ExecEnv::GetInstance()->experimental_mem_tracker());
     return Status::OK();
 }
 
@@ -161,7 +162,8 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId 
query_id, UniqueId frag
                                                 const TQueryOptions& 
query_options) {
     _query_id = query_id;
     _fragment_instance_id = fragment_instance_id;
-    _mem_tracker = 
std::make_shared<MemTracker>("RuntimeFilterMergeControllerEntity");
+    _mem_tracker = 
std::make_shared<MemTracker>("RuntimeFilterMergeControllerEntity",
+                                                
ExecEnv::GetInstance()->experimental_mem_tracker());
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) 
{
         int filter_id = filterid_to_desc.first;
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 044a619e03..c3bd7d0c3f 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -216,8 +216,6 @@ Status RuntimeState::init(const TUniqueId& 
fragment_instance_id, const TQueryOpt
 Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) {
     _query_mem_tracker = std::make_shared<MemTrackerLimiter>(
             MemTrackerLimiter::Type::QUERY, fmt::format("TestQuery#Id={}", 
print_id(query_id)));
-    _scanner_mem_tracker =
-            std::make_shared<MemTracker>(fmt::format("TestScanner#QueryId={}", 
print_id(query_id)));
     return Status::OK();
 }
 
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index ee22760ab1..d72631de92 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -75,11 +75,6 @@ public:
     Status init(const TUniqueId& fragment_instance_id, const TQueryOptions& 
query_options,
                 const TQueryGlobals& query_globals, ExecEnv* exec_env);
 
-    // after SCOPED_ATTACH_TASK;
-    void init_scanner_mem_trackers() {
-        _scanner_mem_tracker = std::make_shared<MemTracker>(
-                fmt::format("Scanner#QueryId={}", print_id(_query_id)));
-    }
     // for ut and non-query.
     Status init_mem_trackers(const TUniqueId& query_id = TUniqueId());
 
@@ -111,7 +106,6 @@ public:
     const TUniqueId& fragment_instance_id() const { return 
_fragment_instance_id; }
     ExecEnv* exec_env() { return _exec_env; }
     std::shared_ptr<MemTrackerLimiter> query_mem_tracker() { return 
_query_mem_tracker; }
-    std::shared_ptr<MemTracker> scanner_mem_tracker() { return 
_scanner_mem_tracker; }
     ThreadResourceMgr::ResourcePool* resource_pool() { return _resource_pool; }
 
     void set_fragment_root_id(PlanNodeId id) {
@@ -431,8 +425,6 @@ private:
     static const int DEFAULT_BATCH_SIZE = 2048;
 
     std::shared_ptr<MemTrackerLimiter> _query_mem_tracker;
-    // Count the memory consumption of Scanner
-    std::shared_ptr<MemTracker> _scanner_mem_tracker;
 
     // put runtime state before _obj_pool, so that it will be deconstructed 
after
     // _obj_pool. Because some of object in _obj_pool will use profile when 
deconstructing.
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index c6e6ecea45..fd87c56f98 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -356,7 +356,6 @@ Status HashJoinNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 
 Status HashJoinNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(VJoinNodeBase::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
 
     auto* memory_usage = runtime_profile()->create_child("MemoryUsage", true, 
true);
     runtime_profile()->add_child(memory_usage, false, nullptr);
@@ -662,7 +661,6 @@ Status HashJoinNode::open(RuntimeState* state) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(VJoinNodeBase::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     RETURN_IF_CANCELLED(state);
     return Status::OK();
 }
diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp 
b/be/src/vec/exec/join/vjoin_node_base.cpp
index 5e5c24fcfc..e186f15d8c 100644
--- a/be/src/vec/exec/join/vjoin_node_base.cpp
+++ b/be/src/vec/exec/join/vjoin_node_base.cpp
@@ -176,7 +176,6 @@ Status VJoinNodeBase::init(const TPlanNode& tnode, 
RuntimeState* state) {
 Status VJoinNodeBase::open(RuntimeState* state) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "VJoinNodeBase::open");
     RETURN_IF_ERROR(ExecNode::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     RETURN_IF_CANCELLED(state);
 
     std::promise<Status> thread_status;
@@ -220,7 +219,6 @@ void VJoinNodeBase::_reset_tuple_is_null_column() {
 void VJoinNodeBase::_probe_side_open_thread(RuntimeState* state, 
std::promise<Status>* status) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, 
"VJoinNodeBase::_hash_table_build_thread");
     SCOPED_ATTACH_TASK(state);
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh_shared());
     status->set_value(child(0)->open(state));
 }
 
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp 
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 39ae99a36e..00ed59bcbd 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -103,7 +103,6 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 Status VNestedLoopJoinNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(VJoinNodeBase::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
 
     _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
     _build_rows_counter = ADD_COUNTER(runtime_profile(), "BuildRows", 
TUnit::UNIT);
@@ -237,7 +236,6 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, 
Block* block, bool* eo
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_TIMER(_probe_timer);
     RETURN_IF_CANCELLED(state);
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
 
     while (need_more_input_data()) {
         RETURN_IF_ERROR(_fresh_left_block(state));
@@ -605,7 +603,6 @@ Status VNestedLoopJoinNode::open(RuntimeState* state) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, 
"VNestedLoopJoinNode::open")
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(VJoinNodeBase::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     RETURN_IF_CANCELLED(state);
     // We can close the right child to release its resources because its input 
has been
     // fully consumed.
diff --git a/be/src/vec/exec/scan/new_es_scan_node.cpp 
b/be/src/vec/exec/scan/new_es_scan_node.cpp
index 61f7f4022c..5b22c2ef19 100644
--- a/be/src/vec/exec/scan/new_es_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_es_scan_node.cpp
@@ -76,7 +76,6 @@ Status NewEsScanNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 Status NewEsScanNode::prepare(RuntimeState* state) {
     VLOG_CRITICAL << NEW_SCAN_NODE_TYPE << "::prepare";
     RETURN_IF_ERROR(VScanNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
 
     _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
     if (_tuple_desc == nullptr) {
diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp 
b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
index b328151a92..19fd93b9d3 100644
--- a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
@@ -37,7 +37,6 @@ std::string NewJdbcScanNode::get_name() {
 Status NewJdbcScanNode::prepare(RuntimeState* state) {
     VLOG_CRITICAL << "VNewJdbcScanNode::Prepare";
     RETURN_IF_ERROR(VScanNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.cpp 
b/be/src/vec/exec/scan/new_odbc_scan_node.cpp
index 2bd6ad4c34..dbbf57a120 100644
--- a/be/src/vec/exec/scan/new_odbc_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_odbc_scan_node.cpp
@@ -38,7 +38,6 @@ std::string NewOdbcScanNode::get_name() {
 Status NewOdbcScanNode::prepare(RuntimeState* state) {
     VLOG_CRITICAL << NEW_SCAN_NODE_TYPE << "::prepare";
     RETURN_IF_ERROR(VScanNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp 
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index a84443d38d..c6df1722cb 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -46,7 +46,6 @@ Status 
NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics) {
 }
 
 Status NewOlapScanNode::prepare(RuntimeState* state) {
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     RETURN_IF_ERROR(VScanNode::prepare(state));
     return Status::OK();
 }
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index f5e270c606..fe7356d0e8 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -191,7 +191,6 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* 
ctx) {
 void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, 
ScannerContext* ctx,
                                      VScanner* scanner) {
     SCOPED_ATTACH_TASK(scanner->runtime_state());
-    
SCOPED_CONSUME_MEM_TRACKER(scanner->runtime_state()->scanner_mem_tracker());
     Thread::set_self_name("_scanner_scan");
     scanner->update_wait_worker_timer();
     scanner->start_scan_cpu_timer();
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index 78e7d91bd3..ebd4132281 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -75,7 +75,6 @@ Status VScanNode::init(const TPlanNode& tnode, RuntimeState* 
state) {
 
 Status VScanNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
 
     // init profile for runtime filter
     for (auto& rf_ctx : _runtime_filter_ctxs) {
@@ -85,7 +84,6 @@ Status VScanNode::prepare(RuntimeState* state) {
 }
 
 Status VScanNode::open(RuntimeState* state) {
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_CANCELLED(state);
@@ -96,7 +94,6 @@ Status VScanNode::alloc_resource(RuntimeState* state) {
     if (_opened) {
         return Status::OK();
     }
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     _input_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_input_tuple_id);
     _output_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
     START_AND_SCOPE_SPAN(state->get_tracer(), span, 
"VScanNode::alloc_resource");
@@ -125,7 +122,6 @@ Status VScanNode::get_next(RuntimeState* state, 
vectorized::Block* block, bool*
     INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, 
"VScanNode::get_next");
     SCOPED_TIMER(_get_next_timer);
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     // in inverted index apply logic, in order to optimize query performance,
     // we built some temporary columns into block, these columns only used in 
scan node level,
     // remove them when query leave scan node to avoid other nodes use 
block->columns() to make a wrong decision
diff --git a/be/src/vec/exec/vaggregation_node.cpp 
b/be/src/vec/exec/vaggregation_node.cpp
index 89504b20ef..6b33cba1d3 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -446,7 +446,6 @@ Status AggregationNode::prepare_profile(RuntimeState* 
state) {
 }
 
 Status AggregationNode::prepare(RuntimeState* state) {
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     SCOPED_TIMER(_runtime_profile->total_time_counter());
 
     RETURN_IF_ERROR(ExecNode::prepare(state));
@@ -456,7 +455,6 @@ Status AggregationNode::prepare(RuntimeState* state) {
 
 Status AggregationNode::alloc_resource(doris::RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::alloc_resource(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
 
     RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state));
 
@@ -534,7 +532,6 @@ Status AggregationNode::get_next(RuntimeState* state, 
Block* block, bool* eos) {
                     _children[0]->get_next_span(), _child_eos);
         };
         {
-            SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
             if (_preagg_block.rows() != 0) {
                 RETURN_IF_ERROR(do_pre_agg(&_preagg_block, block));
             } else {
@@ -542,7 +539,6 @@ Status AggregationNode::get_next(RuntimeState* state, 
Block* block, bool* eos) {
             }
         }
     } else {
-        SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
         RETURN_IF_ERROR(pull(state, block, eos));
     }
     return Status::OK();
@@ -758,7 +754,7 @@ Status AggregationNode::_merge_without_key(Block* block) {
 
 void AggregationNode::_update_memusage_without_key() {
     auto arena_memory_usage = _agg_arena_pool->size() - 
_mem_usage_record.used_in_arena;
-    mem_tracker_held()->consume(arena_memory_usage);
+    mem_tracker()->consume(arena_memory_usage);
     _serialize_key_arena_memory_usage->add(arena_memory_usage);
     _mem_usage_record.used_in_arena = _agg_arena_pool->size();
 }
@@ -1370,9 +1366,9 @@ void 
AggregationNode::_update_memusage_with_serialized_key() {
                 auto arena_memory_usage = _agg_arena_pool->size() +
                                           
_aggregate_data_container->memory_usage() -
                                           _mem_usage_record.used_in_arena;
-                mem_tracker_held()->consume(arena_memory_usage);
-                mem_tracker_held()->consume(data.get_buffer_size_in_bytes() -
-                                            _mem_usage_record.used_in_state);
+                mem_tracker()->consume(arena_memory_usage);
+                mem_tracker()->consume(data.get_buffer_size_in_bytes() -
+                                       _mem_usage_record.used_in_state);
                 _serialize_key_arena_memory_usage->add(arena_memory_usage);
                 COUNTER_UPDATE(_hash_table_memory_usage,
                                data.get_buffer_size_in_bytes() - 
_mem_usage_record.used_in_state);
@@ -1399,7 +1395,7 @@ void AggregationNode::_close_with_serialized_key() {
 }
 
 void AggregationNode::release_tracker() {
-    mem_tracker_held()->release(_mem_usage_record.used_in_state + 
_mem_usage_record.used_in_arena);
+    mem_tracker()->release(_mem_usage_record.used_in_state + 
_mem_usage_record.used_in_arena);
 }
 
 void AggregationNode::_release_mem() {
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp 
b/be/src/vec/exec/vanalytic_eval_node.cpp
index b2235de9ab..237d583cc0 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -140,7 +140,6 @@ Status VAnalyticEvalNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 }
 
 Status VAnalyticEvalNode::prepare(RuntimeState* state) {
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
     DCHECK(child(0)->row_desc().is_prefix_of(_row_descriptor));
@@ -211,7 +210,6 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) {
 
 Status VAnalyticEvalNode::open(RuntimeState* state) {
     RETURN_IF_ERROR(alloc_resource(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     RETURN_IF_ERROR(child(0)->open(state));
     return Status::OK();
 }
@@ -226,13 +224,11 @@ Status VAnalyticEvalNode::close(RuntimeState* state) {
 
 Status VAnalyticEvalNode::alloc_resource(RuntimeState* state) {
     {
-        SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
         START_AND_SCOPE_SPAN(state->get_tracer(), span, 
"VAnalyticEvalNode::open");
         SCOPED_TIMER(_runtime_profile->total_time_counter());
         RETURN_IF_ERROR(ExecNode::alloc_resource(state));
         RETURN_IF_CANCELLED(state);
     }
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     RETURN_IF_ERROR(VExpr::open(_partition_by_eq_expr_ctxs, state));
     RETURN_IF_ERROR(VExpr::open(_order_by_eq_expr_ctxs, state));
     for (size_t i = 0; i < _agg_functions_size; ++i) {
@@ -322,14 +318,12 @@ Status VAnalyticEvalNode::get_next(RuntimeState* state, 
vectorized::Block* block
             return Status::OK();
         }
         size_t current_block_rows = _input_blocks[_output_block_index].rows();
-        SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
         RETURN_IF_ERROR(_executor.get_next(current_block_rows));
         if (_window_end_position == current_block_rows) {
             break;
         }
     }
     RETURN_IF_ERROR(_output_current_block(block));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block, 
block->columns()));
     reached_limit(block, eos);
     return Status::OK();
@@ -393,8 +387,6 @@ Status 
VAnalyticEvalNode::_consumed_block_and_init_partition(RuntimeState* state
         found_partition_end = _get_partition_by_end();  //claculate new 
partition end
     }
 
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
-
     if (_input_eos && _input_total_rows == 0) {
         *eos = true;
         return Status::OK();
@@ -528,8 +520,6 @@ Status VAnalyticEvalNode::sink(doris::RuntimeState* 
/*state*/, vectorized::Block
         return Status::OK();
     }
 
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
-
     input_block_first_row_positions.emplace_back(_input_total_rows);
     size_t block_rows = input_block->rows();
     _input_total_rows += block_rows;
@@ -566,7 +556,7 @@ Status VAnalyticEvalNode::sink(doris::RuntimeState* 
/*state*/, vectorized::Block
         _ordey_by_column_idxs[i] = result_col_id;
     }
 
-    mem_tracker_held()->consume(input_block->allocated_bytes());
+    mem_tracker()->consume(input_block->allocated_bytes());
     _blocks_memory_usage->add(input_block->allocated_bytes());
 
     //TODO: if need improvement, the is a tips to maintain a free queue,
@@ -627,11 +617,9 @@ void VAnalyticEvalNode::_insert_result_info(int64_t 
current_block_rows) {
 }
 
 Status VAnalyticEvalNode::_output_current_block(Block* block) {
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
-
     block->swap(std::move(_input_blocks[_output_block_index]));
     _blocks_memory_usage->add(-block->allocated_bytes());
-    mem_tracker_held()->consume(-block->allocated_bytes());
+    mem_tracker()->consume(-block->allocated_bytes());
     if (_origin_cols.size() < block->columns()) {
         block->erase_not_in(_origin_cols);
     }
diff --git a/be/src/vec/exec/vbroker_scan_node.cpp 
b/be/src/vec/exec/vbroker_scan_node.cpp
index ac96f97965..a94b73592a 100644
--- a/be/src/vec/exec/vbroker_scan_node.cpp
+++ b/be/src/vec/exec/vbroker_scan_node.cpp
@@ -59,7 +59,6 @@ Status VBrokerScanNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 Status VBrokerScanNode::prepare(RuntimeState* state) {
     VLOG_QUERY << "VBrokerScanNode prepare";
     RETURN_IF_ERROR(ScanNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     // get tuple desc
     _runtime_state = state;
     _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
@@ -85,7 +84,6 @@ Status VBrokerScanNode::open(RuntimeState* state) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "VBrokerScanNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     RETURN_IF_CANCELLED(state);
 
     RETURN_IF_ERROR(start_scanners());
@@ -109,7 +107,6 @@ Status VBrokerScanNode::start_scanners() {
 Status VBrokerScanNode::get_next(RuntimeState* state, vectorized::Block* 
block, bool* eos) {
     INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, 
"VBrokerScanNode::get_next");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     // check if CANCELLED.
     if (state->is_cancelled()) {
         std::unique_lock<std::mutex> l(_batch_queue_lock);
@@ -271,7 +268,6 @@ Status VBrokerScanNode::scanner_scan(const 
TBrokerScanRange& scan_range, Scanner
 void VBrokerScanNode::scanner_worker(int start_idx, int length) {
     START_AND_SCOPE_SPAN(_runtime_state->get_tracer(), span, 
"VBrokerScanNode::scanner_worker");
     SCOPED_ATTACH_TASK(_runtime_state);
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh_shared());
     Thread::set_self_name("vbroker_scanner");
     Status status = Status::OK();
     ScannerCounter counter;
diff --git a/be/src/vec/exec/vexchange_node.cpp 
b/be/src/vec/exec/vexchange_node.cpp
index 272116cbe1..069caaf635 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -54,7 +54,6 @@ Status VExchangeNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 
 Status VExchangeNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     DCHECK_GT(_num_senders, 0);
     _sub_plan_query_statistics_recvr.reset(new QueryStatisticsRecvr());
     _stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
@@ -83,7 +82,6 @@ Status VExchangeNode::alloc_resource(RuntimeState* state) {
 Status VExchangeNode::open(RuntimeState* state) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "VExchangeNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     RETURN_IF_ERROR(ExecNode::open(state));
 
     return Status::OK();
@@ -92,7 +90,6 @@ Status VExchangeNode::open(RuntimeState* state) {
 Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) {
     INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, 
"VExchangeNode::get_next");
     SCOPED_TIMER(runtime_profile()->total_time_counter());
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     if (_is_merging && state->enable_pipeline_exec() && !_is_ready) {
         
RETURN_IF_ERROR(_stream_recvr->create_merger(_vsort_exec_exprs.lhs_ordering_expr_ctxs(),
                                                      _is_asc_order, 
_nulls_first,
diff --git a/be/src/vec/exec/vmysql_scan_node.cpp 
b/be/src/vec/exec/vmysql_scan_node.cpp
index 3fb03bd19c..dcc7593ee9 100644
--- a/be/src/vec/exec/vmysql_scan_node.cpp
+++ b/be/src/vec/exec/vmysql_scan_node.cpp
@@ -49,7 +49,6 @@ Status VMysqlScanNode::prepare(RuntimeState* state) {
     }
 
     RETURN_IF_ERROR(ScanNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     // get tuple desc
     _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
 
@@ -103,7 +102,6 @@ Status VMysqlScanNode::open(RuntimeState* state) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "VMysqlScanNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     VLOG_CRITICAL << "MysqlScanNode::Open";
 
     if (!_is_init) {
diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp
index 8a090ad0d6..c3fe0e8d0f 100644
--- a/be/src/vec/exec/vrepeat_node.cpp
+++ b/be/src/vec/exec/vrepeat_node.cpp
@@ -44,7 +44,6 @@ Status VRepeatNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
 
     RETURN_IF_ERROR(ExecNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     _output_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
     if (_output_tuple_desc == nullptr) {
         return Status::InternalError("Failed to get tuple descriptor.");
@@ -72,7 +71,6 @@ Status VRepeatNode::alloc_resource(RuntimeState* state) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "VRepeatNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::alloc_resource(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     RETURN_IF_ERROR(VExpr::open(_expr_ctxs, state));
     return Status::OK();
 }
diff --git a/be/src/vec/exec/vschema_scan_node.cpp 
b/be/src/vec/exec/vschema_scan_node.cpp
index 9e3e6a1c42..65588eb11b 100644
--- a/be/src/vec/exec/vschema_scan_node.cpp
+++ b/be/src/vec/exec/vschema_scan_node.cpp
@@ -122,7 +122,6 @@ Status VSchemaScanNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(ExecNode::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
 
     if (_scanner_param.user) {
         TSetSessionParams param;
@@ -145,7 +144,6 @@ Status VSchemaScanNode::prepare(RuntimeState* state) {
     }
     START_AND_SCOPE_SPAN(state->get_tracer(), span, 
"VSchemaScanNode::prepare");
     RETURN_IF_ERROR(ScanNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
 
     // new one mem pool
     _tuple_pool.reset(new (std::nothrow) MemPool());
diff --git a/be/src/vec/exec/vset_operation_node.cpp 
b/be/src/vec/exec/vset_operation_node.cpp
index 9ad65dbf1c..253d360569 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -193,7 +193,6 @@ Status VSetOperationNode<is_intersect>::open(RuntimeState* 
state) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, 
"VSetOperationNode<is_intersect>::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
 
     // TODO: build the hash table in a thread to open other children 
asynchronously.
     RETURN_IF_ERROR(hash_table_build(state));
@@ -234,7 +233,6 @@ template <bool is_intersect>
 Status VSetOperationNode<is_intersect>::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
     _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
     _pull_timer = ADD_TIMER(runtime_profile(), "PullTime");
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index 4ebdd05506..fd7c88af75 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -87,7 +87,6 @@ Status VSortNode::init(const TPlanNode& tnode, RuntimeState* 
state) {
 }
 
 Status VSortNode::prepare(RuntimeState* state) {
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
     _runtime_profile->add_info_string("TOP-N", _limit == -1 ? "false" : 
"true");
@@ -102,7 +101,6 @@ Status VSortNode::prepare(RuntimeState* state) {
 
 Status VSortNode::alloc_resource(doris::RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::alloc_resource(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(state->check_query_state("vsort, while open."));
@@ -161,13 +159,12 @@ Status VSortNode::open(RuntimeState* state) {
                                   _children[0], std::placeholders::_1, 
std::placeholders::_2,
                                   std::placeholders::_3)),
                 child(0)->get_next_span(), eos);
-        SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
         RETURN_IF_ERROR(sink(state, upstream_block.get(), eos));
     } while (!eos);
 
     child(0)->close(state);
 
-    mem_tracker_held()->consume(_sorter->data_size());
+    mem_tracker()->consume(_sorter->data_size());
     COUNTER_UPDATE(_sort_blocks_memory_usage, _sorter->data_size());
 
     return Status::OK();
@@ -183,7 +180,6 @@ Status VSortNode::pull(doris::RuntimeState* state, 
vectorized::Block* output_blo
 }
 
 Status VSortNode::get_next(RuntimeState* state, Block* block, bool* eos) {
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, 
"VSortNode::get_next");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
 
diff --git a/be/src/vec/exec/vtable_function_node.cpp 
b/be/src/vec/exec/vtable_function_node.cpp
index c3e9f2a829..f7fe4f71d6 100644
--- a/be/src/vec/exec/vtable_function_node.cpp
+++ b/be/src/vec/exec/vtable_function_node.cpp
@@ -83,7 +83,6 @@ bool VTableFunctionNode::_is_inner_and_empty() {
 Status VTableFunctionNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
 
     _num_rows_filtered_counter = ADD_COUNTER(_runtime_profile, "RowsFiltered", 
TUnit::UNIT);
 
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 3efb84b4c0..f58e3d1652 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -308,8 +308,9 @@ VDataStreamRecvr::VDataStreamRecvr(
           _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr),
           _enable_pipeline(state->enable_pipeline_exec()) {
     // DataStreamRecvr may be destructed after the instance execution thread 
ends.
-    _mem_tracker = std::make_unique<MemTracker>(
-            "VDataStreamRecvr:" + print_id(_fragment_instance_id), _profile);
+    _mem_tracker =
+            std::make_unique<MemTracker>("VDataStreamRecvr:" + 
print_id(_fragment_instance_id),
+                                         _profile, nullptr, "PeakMemoryUsage");
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
     // Create one queue per sender if is_merging is true.
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 1d36902561..8b87ff4840 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -421,7 +421,8 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
     _profile = _pool->add(new RuntimeProfile(title));
     SCOPED_TIMER(_profile->total_time_counter());
     _mem_tracker = std::make_unique<MemTracker>(
-            "VDataStreamSender:" + print_id(state->fragment_instance_id()), 
_profile);
+            "VDataStreamSender:" + print_id(state->fragment_instance_id()), 
_profile, nullptr,
+            "PeakMemoryUsage");
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
     if (_part_type == TPartitionType::UNPARTITIONED || _part_type == 
TPartitionType::RANDOM) {
diff --git a/be/test/testutil/run_all_tests.cpp 
b/be/test/testutil/run_all_tests.cpp
index 99a338a5aa..cd6499831f 100644
--- a/be/test/testutil/run_all_tests.cpp
+++ b/be/test/testutil/run_all_tests.cpp
@@ -28,10 +28,7 @@
 #include "util/mem_info.h"
 
 int main(int argc, char** argv) {
-    std::shared_ptr<doris::MemTrackerLimiter> orphan_mem_tracker =
-            
std::make_shared<doris::MemTrackerLimiter>(doris::MemTrackerLimiter::Type::GLOBAL,
-                                                       "Orphan");
-    doris::ExecEnv::GetInstance()->set_orphan_mem_tracker(orphan_mem_tracker);
+    doris::ExecEnv::GetInstance()->init_mem_tracker();
     doris::thread_context()->thread_mem_tracker_mgr->init();
     doris::TabletSchemaCache::create_global_schema_cache();
     doris::StoragePageCache::create_global_cache(1 << 30, 10);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to