This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 97fcad76f8 [enhancement](memtracker) Improve readability (#15716)
97fcad76f8 is described below
commit 97fcad76f83adccc43eaff995bf1d570ded30605
Author: Xinyi Zou <[email protected]>
AuthorDate: Mon Jan 16 16:30:35 2023 +0800
[enhancement](memtracker) Improve readability (#15716)
---
be/src/exec/exec_node.cpp | 15 ++-------------
be/src/exec/exec_node.h | 12 ++++--------
be/src/http/default_path_handlers.cpp | 6 ++----
be/src/olap/delta_writer.cpp | 4 ++--
be/src/olap/memtable.cpp | 4 ++--
be/src/olap/olap_server.cpp | 10 ++--------
be/src/olap/push_handler.cpp | 4 ++--
be/src/olap/storage_engine.cpp | 7 ++-----
be/src/olap/storage_engine.h | 2 --
be/src/olap/tablet_manager.cpp | 11 ++++++++++-
be/src/olap/tablet_manager.h | 1 +
be/src/olap/task/engine_batch_load_task.cpp | 2 +-
be/src/olap/task/engine_checksum_task.cpp | 2 +-
be/src/pipeline/exec/operator.h | 10 ++++++----
be/src/pipeline/pipeline_fragment_context.cpp | 1 -
be/src/runtime/exec_env.h | 7 +++----
be/src/runtime/exec_env_init.cpp | 12 +++++++++---
be/src/runtime/load_channel_mgr.cpp | 7 +++----
be/src/runtime/load_channel_mgr.h | 14 +++++++-------
be/src/runtime/memory/mem_tracker.cpp | 14 +++++++++++---
be/src/runtime/memory/mem_tracker.h | 14 +++++++-------
be/src/runtime/memory/mem_tracker_limiter.cpp | 21 +++++++++++++--------
be/src/runtime/memory/mem_tracker_limiter.h | 15 ++++++---------
be/src/runtime/plan_fragment_executor.cpp | 1 -
be/src/runtime/runtime_filter_mgr.cpp | 6 ++++--
be/src/runtime/runtime_state.cpp | 2 --
be/src/runtime/runtime_state.h | 8 --------
be/src/vec/exec/join/vhash_join_node.cpp | 2 --
be/src/vec/exec/join/vjoin_node_base.cpp | 2 --
be/src/vec/exec/join/vnested_loop_join_node.cpp | 3 ---
be/src/vec/exec/scan/new_es_scan_node.cpp | 1 -
be/src/vec/exec/scan/new_jdbc_scan_node.cpp | 1 -
be/src/vec/exec/scan/new_odbc_scan_node.cpp | 1 -
be/src/vec/exec/scan/new_olap_scan_node.cpp | 1 -
be/src/vec/exec/scan/scanner_scheduler.cpp | 1 -
be/src/vec/exec/scan/vscan_node.cpp | 4 ----
be/src/vec/exec/vaggregation_node.cpp | 14 +++++---------
be/src/vec/exec/vanalytic_eval_node.cpp | 16 ++--------------
be/src/vec/exec/vbroker_scan_node.cpp | 4 ----
be/src/vec/exec/vexchange_node.cpp | 3 ---
be/src/vec/exec/vmysql_scan_node.cpp | 2 --
be/src/vec/exec/vrepeat_node.cpp | 2 --
be/src/vec/exec/vschema_scan_node.cpp | 2 --
be/src/vec/exec/vset_operation_node.cpp | 2 --
be/src/vec/exec/vsort_node.cpp | 6 +-----
be/src/vec/exec/vtable_function_node.cpp | 1 -
be/src/vec/runtime/vdata_stream_recvr.cpp | 5 +++--
be/src/vec/sink/vdata_stream_sender.cpp | 3 ++-
be/test/testutil/run_all_tests.cpp | 5 +----
49 files changed, 114 insertions(+), 179 deletions(-)
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 9bd0c5c494..e0c85d9816 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -142,18 +142,8 @@ Status ExecNode::prepare(RuntimeState* state) {
std::bind<int64_t>(&RuntimeProfile::units_per_second,
_rows_returned_counter,
runtime_profile()->total_time_counter()),
"");
- _mem_tracker_held =
- std::make_unique<MemTracker>("ExecNode:" +
_runtime_profile->name(),
- _runtime_profile.get(), nullptr,
"PeakMemoryUsage");
- // Only when the query profile is enabled, the node allocated memory will
be track through the mem hook,
- // otherwise _mem_tracker_growh is nullptr, and SCOPED_CONSUME_MEM_TRACKER
will do nothing.
- if (state->query_options().__isset.is_report_success &&
- state->query_options().is_report_success) {
- _mem_tracker_growh = std::make_shared<MemTracker>(
- "ExecNode:MemoryOnlyTrackAlloc:" + _runtime_profile->name(),
_runtime_profile.get(),
- nullptr, "MemoryOnlyTrackAllocNoConsiderFree", true);
- }
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
+ _mem_tracker = std::make_unique<MemTracker>("ExecNode:" +
_runtime_profile->name(),
+ _runtime_profile.get(),
nullptr, "PeakMemoryUsage");
if (_vconjunct_ctx_ptr) {
RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->prepare(state,
intermediate_row_desc()));
@@ -176,7 +166,6 @@ Status ExecNode::prepare(RuntimeState* state) {
}
Status ExecNode::alloc_resource(doris::RuntimeState* state) {
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
if (_vconjunct_ctx_ptr) {
RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->open(state));
}
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index 353db077f3..76ab99fd42 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -229,9 +229,7 @@ public:
RuntimeProfile* runtime_profile() const { return _runtime_profile.get(); }
RuntimeProfile::Counter* memory_used_counter() const { return
_memory_used_counter; }
- MemTracker* mem_tracker_held() const { return _mem_tracker_held.get(); }
- MemTracker* mem_tracker_growh() const { return _mem_tracker_growh.get(); }
- std::shared_ptr<MemTracker> mem_tracker_growh_shared() const { return
_mem_tracker_growh; }
+ MemTracker* mem_tracker() const { return _mem_tracker.get(); }
OpentelemetrySpan get_next_span() { return _get_next_span; }
@@ -279,11 +277,9 @@ protected:
std::unique_ptr<RuntimeProfile> _runtime_profile;
- // Record the memory size held by this node.
- std::unique_ptr<MemTracker> _mem_tracker_held;
- // Record the memory size allocated by this node.
- // Similar to tcmalloc heap profile growh, only track memory alloc, not
track memory free.
- std::shared_ptr<MemTracker> _mem_tracker_growh;
+ // Record this node memory size. it is expected that artificial guarantees
are accurate,
+ // which will providea reference for operator memory.
+ std::unique_ptr<MemTracker> _mem_tracker;
RuntimeProfile::Counter* _rows_returned_counter;
RuntimeProfile::Counter* _rows_returned_rate;
diff --git a/be/src/http/default_path_handlers.cpp
b/be/src/http/default_path_handlers.cpp
index 42a29d21e9..e0ddfbbb37 100644
--- a/be/src/http/default_path_handlers.cpp
+++ b/be/src/http/default_path_handlers.cpp
@@ -147,11 +147,9 @@ void mem_tracker_handler(const
WebPageHandler::ArgumentMap& args, std::stringstr
MemTrackerLimiter::Type::SCHEMA_CHANGE);
} else if (iter->second == "clone") {
MemTrackerLimiter::make_type_snapshots(&snapshots,
MemTrackerLimiter::Type::CLONE);
- } else if (iter->second == "batch_load") {
- MemTrackerLimiter::make_type_snapshots(&snapshots,
MemTrackerLimiter::Type::BATCHLOAD);
- } else if (iter->second == "consistency") {
+ } else if (iter->second == "experimental") {
MemTrackerLimiter::make_type_snapshots(&snapshots,
-
MemTrackerLimiter::Type::CONSISTENCY);
+
MemTrackerLimiter::Type::EXPERIMENTAL);
}
} else {
(*output) << "<h4>*Note: (see documentation for details)</h4>\n";
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index c001b67cad..101b46bf6a 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -245,11 +245,11 @@ void DeltaWriter::_reset_mem_table() {
auto mem_table_insert_tracker = std::make_shared<MemTracker>(
fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
std::to_string(tablet_id()), _mem_table_num,
_load_id.to_string()),
- nullptr,
ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker_set());
+ ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
auto mem_table_flush_tracker = std::make_shared<MemTracker>(
fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}",
std::to_string(tablet_id()), _mem_table_num++,
_load_id.to_string()),
- nullptr,
ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker_set());
+ ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
#else
auto mem_table_insert_tracker = std::make_shared<MemTracker>(
fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index ea1188e874..53d785f2c5 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -57,8 +57,8 @@ MemTable::MemTable(TabletSharedPtr tablet, Schema* schema,
const TabletSchema* t
_cur_max_version(cur_max_version) {
#ifndef BE_TEST
_insert_mem_tracker_use_hook = std::make_unique<MemTracker>(
- fmt::format("MemTableHookInsert:TabletId={}",
std::to_string(tablet_id())), nullptr,
- ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker_set());
+ fmt::format("MemTableHookInsert:TabletId={}",
std::to_string(tablet_id())),
+ ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
#else
_insert_mem_tracker_use_hook = std::make_unique<MemTracker>(
fmt::format("MemTableHookInsert:TabletId={}",
std::to_string(tablet_id())));
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 2d5d12c860..7ab1bf1a97 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -120,20 +120,14 @@ Status StorageEngine::start_bg_threads() {
scoped_refptr<Thread> path_scan_thread;
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "path_scan_thread",
- [this, data_dir]() {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
- this->_path_scan_thread_callback(data_dir);
- },
+ [this, data_dir]() {
this->_path_scan_thread_callback(data_dir); },
&path_scan_thread));
_path_scan_threads.emplace_back(path_scan_thread);
scoped_refptr<Thread> path_gc_thread;
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "path_gc_thread",
- [this, data_dir]() {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
- this->_path_gc_thread_callback(data_dir);
- },
+ [this, data_dir]() {
this->_path_gc_thread_callback(data_dir); },
&path_gc_thread));
_path_gc_threads.emplace_back(path_gc_thread);
}
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index ad3bb25e79..49b01f6418 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -833,8 +833,8 @@ Status PushBrokerReader::init(const Schema* schema, const
TBrokerScanRange& t_sc
}
_runtime_profile = _runtime_state->runtime_profile();
_runtime_profile->set_name("PushBrokerReader");
- _mem_pool.reset(new MemPool(_runtime_state->scanner_mem_tracker().get()));
- _tuple_buffer_pool.reset(new
MemPool(_runtime_state->scanner_mem_tracker().get()));
+ _mem_pool.reset(new MemPool());
+ _tuple_buffer_pool.reset(new MemPool());
_counter.reset(new ScannerCounter());
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index ffa137c26f..cea5856dbb 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -108,7 +108,6 @@ StorageEngine::StorageEngine(const EngineOptions& options)
_effective_cluster_id(-1),
_is_all_cluster_id_exist(true),
_stopped(false),
- _mem_tracker(std::make_shared<MemTracker>("StorageEngine")),
_segcompaction_mem_tracker(std::make_shared<MemTracker>("SegCompaction")),
_segment_meta_mem_tracker(std::make_shared<MemTracker>("SegmentMeta")),
_stop_background_threads_latch(1),
@@ -149,8 +148,7 @@ StorageEngine::~StorageEngine() {
void StorageEngine::load_data_dirs(const std::vector<DataDir*>& data_dirs) {
std::vector<std::thread> threads;
for (auto data_dir : data_dirs) {
- threads.emplace_back([this, data_dir] {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+ threads.emplace_back([data_dir] {
auto res = data_dir->load();
if (!res.ok()) {
LOG(WARNING) << "io error when init load tables. res=" << res
@@ -195,8 +193,7 @@ Status StorageEngine::_init_store_map() {
DataDir* store = new DataDir(path.path, path.capacity_bytes,
path.storage_medium,
_tablet_manager.get(),
_txn_manager.get());
tmp_stores.emplace_back(store);
- threads.emplace_back([this, store, &error_msg_lock, &error_msg]() {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+ threads.emplace_back([store, &error_msg_lock, &error_msg]() {
auto st = store->init();
if (!st.ok()) {
{
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 910c926c69..dcaa78ba56 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -322,8 +322,6 @@ private:
// map<rowset_id(str), RowsetSharedPtr>, if we use RowsetId as the key, we
need custom hash func
std::unordered_map<std::string, RowsetSharedPtr> _unused_rowsets;
- // StorageEngine oneself
- std::shared_ptr<MemTracker> _mem_tracker;
// Count the memory consumption of segment compaction tasks.
std::shared_ptr<MemTracker> _segcompaction_mem_tracker;
// This mem tracker is only for tracking memory use by segment meta data
such as footer or index page.
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 03d6b0f4d1..9d8633dabc 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -74,7 +74,9 @@
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(tablet_meta_mem_consumption, MetricUnit::BYTE
mem_consumption, Labels({{"type",
"tablet_meta"}}));
TabletManager::TabletManager(int32_t tablet_map_lock_shard_size)
- : _mem_tracker(std::make_shared<MemTracker>("TabletManager")),
+ : _mem_tracker(std::make_shared<MemTracker>(
+ "TabletManager",
ExecEnv::GetInstance()->experimental_mem_tracker())),
+ _tablet_meta_mem_tracker(std::make_shared<MemTracker>("TabletMeta")),
_tablets_shards_size(tablet_map_lock_shard_size),
_tablets_shards_mask(tablet_map_lock_shard_size - 1) {
CHECK_GT(_tablets_shards_size, 0);
@@ -207,6 +209,10 @@ Status
TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id,
tablet_map_t& tablet_map = _get_tablet_map(tablet_id);
tablet_map[tablet_id] = tablet;
_add_tablet_to_partition(tablet);
+ // TODO: remove multiply 2 of tablet meta mem size
+ // Because table schema will copy in tablet, there will be double mem cost
+ // so here multiply 2
+ _tablet_meta_mem_tracker->consume(tablet->tablet_meta()->mem_size() * 2);
VLOG_NOTICE << "add tablet to map successfully."
<< " tablet_id=" << tablet_id;
@@ -489,6 +495,7 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId
tablet_id, TReplicaId repl
}
to_drop_tablet->deregister_tablet_from_dir();
+
_tablet_meta_mem_tracker->release(to_drop_tablet->tablet_meta()->mem_size() *
2);
return Status::OK();
}
@@ -718,6 +725,7 @@ Status TabletManager::load_tablet_from_meta(DataDir*
data_dir, TTabletId tablet_
TSchemaHash schema_hash, const
string& meta_binary,
bool update_meta, bool force, bool
restore,
bool check_path) {
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
TabletMetaSharedPtr tablet_meta(new TabletMeta());
Status status = tablet_meta->deserialize(meta_binary);
if (!status.ok()) {
@@ -800,6 +808,7 @@ Status TabletManager::load_tablet_from_meta(DataDir*
data_dir, TTabletId tablet_
Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id,
SchemaHash schema_hash, const
string& schema_hash_path,
bool force, bool restore) {
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
LOG(INFO) << "begin to load tablet from dir. "
<< " tablet_id=" << tablet_id << " schema_hash=" << schema_hash
<< " path = " << schema_hash_path << " force = " << force << "
restore = " << restore;
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 58a60cbee7..30a64937cc 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -202,6 +202,7 @@ private:
// trace the memory use by meta of tablet
std::shared_ptr<MemTracker> _mem_tracker;
+ std::shared_ptr<MemTracker> _tablet_meta_mem_tracker;
const int32_t _tablets_shards_size;
const int32_t _tablets_shards_mask;
diff --git a/be/src/olap/task/engine_batch_load_task.cpp
b/be/src/olap/task/engine_batch_load_task.cpp
index bd3abf3efc..a9135df050 100644
--- a/be/src/olap/task/engine_batch_load_task.cpp
+++ b/be/src/olap/task/engine_batch_load_task.cpp
@@ -50,7 +50,7 @@ using namespace ErrorCode;
EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req,
std::vector<TTabletInfo>* tablet_infos)
: _push_req(push_req), _tablet_infos(tablet_infos) {
_mem_tracker = std::make_shared<MemTrackerLimiter>(
- MemTrackerLimiter::Type::BATCHLOAD,
+ MemTrackerLimiter::Type::LOAD,
fmt::format("EngineBatchLoadTask#pushType={}:tabletId={}",
_push_req.push_type,
std::to_string(_push_req.tablet_id)));
}
diff --git a/be/src/olap/task/engine_checksum_task.cpp
b/be/src/olap/task/engine_checksum_task.cpp
index d1e8ae7eb2..c298b104c5 100644
--- a/be/src/olap/task/engine_checksum_task.cpp
+++ b/be/src/olap/task/engine_checksum_task.cpp
@@ -26,7 +26,7 @@ EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id,
TSchemaHash schema_h
TVersion version, uint32_t* checksum)
: _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version),
_checksum(checksum) {
_mem_tracker = std::make_shared<MemTrackerLimiter>(
- MemTrackerLimiter::Type::CONSISTENCY,
+ MemTrackerLimiter::Type::LOAD,
"EngineChecksumTask#tabletId=" + std::to_string(tablet_id));
}
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 0b7ebfdadc..e2fde2d9e5 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -262,8 +262,9 @@ public:
_runtime_profile.reset(new RuntimeProfile(
fmt::format("{} (id={})", _operator_builder->get_name(),
_operator_builder->id())));
_sink->profile()->insert_child_head(_runtime_profile.get(), true);
- _mem_tracker = std::make_unique<MemTracker>("DataSinkOperator:" +
_runtime_profile->name(),
- _runtime_profile.get());
+ _mem_tracker =
+ std::make_unique<MemTracker>("DataSinkOperator:" +
_runtime_profile->name(),
+ _runtime_profile.get(), nullptr,
"PeakMemoryUsage");
return Status::OK();
}
@@ -319,8 +320,9 @@ public:
_runtime_profile.reset(new RuntimeProfile(
fmt::format("{} (id={})", _operator_builder->get_name(),
_operator_builder->id())));
_node->runtime_profile()->insert_child_head(_runtime_profile.get(),
true);
- _mem_tracker = std::make_unique<MemTracker>(get_name() + ": " +
_runtime_profile->name(),
- _runtime_profile.get());
+ _mem_tracker =
+ std::make_unique<MemTracker>(get_name() + ": " +
_runtime_profile->name(),
+ _runtime_profile.get(), nullptr,
"PeakMemoryUsage");
_node->increase_ref();
return Status::OK();
}
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index d87b47db28..52fb8d2175 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -181,7 +181,6 @@ Status PipelineFragmentContext::prepare(const
doris::TExecPlanFragmentParams& re
// TODO should be combine with plan_fragment_executor.prepare funciton
SCOPED_ATTACH_TASK(get_runtime_state());
- _runtime_state->init_scanner_mem_trackers();
_runtime_state->runtime_filter_mgr()->init();
_runtime_state->set_be_number(request.backend_num);
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index c47d15d9fa..f90cb98ba6 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -120,12 +120,10 @@ public:
return nullptr;
}
- void set_orphan_mem_tracker(const std::shared_ptr<MemTrackerLimiter>&
orphan_tracker) {
- _orphan_mem_tracker = orphan_tracker;
- _orphan_mem_tracker_raw = orphan_tracker.get();
- }
+ void init_mem_tracker();
std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker() { return
_orphan_mem_tracker; }
MemTrackerLimiter* orphan_mem_tracker_raw() { return
_orphan_mem_tracker_raw; }
+ MemTrackerLimiter* experimental_mem_tracker() { return
_experimental_mem_tracker.get(); }
ThreadResourceMgr* thread_mgr() { return _thread_mgr; }
ThreadPool* send_batch_thread_pool() { return
_send_batch_thread_pool.get(); }
ThreadPool* download_cache_thread_pool() { return
_download_cache_thread_pool.get(); }
@@ -215,6 +213,7 @@ private:
// and the consumption of the orphan mem tracker is close to 0, but
greater than 0.
std::shared_ptr<MemTrackerLimiter> _orphan_mem_tracker;
MemTrackerLimiter* _orphan_mem_tracker_raw;
+ std::shared_ptr<MemTrackerLimiter> _experimental_mem_tracker;
std::unique_ptr<ThreadPool> _send_batch_thread_pool;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 22fa7f9123..d557e8e0ef 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -166,9 +166,7 @@ Status ExecEnv::_init_mem_env() {
bool is_percent = false;
std::stringstream ss;
// 1. init mem tracker
- _orphan_mem_tracker =
-
std::make_shared<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, "Orphan");
- _orphan_mem_tracker_raw = _orphan_mem_tracker.get();
+ init_mem_tracker();
thread_context()->thread_mem_tracker_mgr->init();
#if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) &&
!defined(ADDRESS_SANITIZER) && \
!defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) &&
!defined(USE_JEMALLOC)
@@ -236,6 +234,14 @@ Status ExecEnv::_init_mem_env() {
return Status::OK();
}
+void ExecEnv::init_mem_tracker() {
+ _orphan_mem_tracker =
+
std::make_shared<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, "Orphan");
+ _orphan_mem_tracker_raw = _orphan_mem_tracker.get();
+ _experimental_mem_tracker = std::make_shared<MemTrackerLimiter>(
+ MemTrackerLimiter::Type::EXPERIMENTAL, "ExperimentalSet");
+}
+
void ExecEnv::init_download_cache_buf() {
std::unique_ptr<char[]> download_cache_buf(new
char[config::download_cache_buffer_size]);
memset(download_cache_buf.get(), 0, config::download_cache_buffer_size);
diff --git a/be/src/runtime/load_channel_mgr.cpp
b/be/src/runtime/load_channel_mgr.cpp
index 8d26c47e02..bd00718472 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -75,9 +75,8 @@ LoadChannelMgr::~LoadChannelMgr() {
Status LoadChannelMgr::init(int64_t process_mem_limit) {
_load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit);
_load_soft_mem_limit = _load_hard_mem_limit *
config::load_process_soft_mem_limit_percent / 100;
- _mem_tracker = std::make_unique<MemTracker>("LoadChannelMgr");
- _mem_tracker_set =
std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::LOAD,
-
"LoadChannelMgrTrackerSet");
+ _mem_tracker =
+ std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::LOAD,
"LoadChannelMgr");
REGISTER_HOOK_METRIC(load_channel_mem_consumption,
[this]() { return _mem_tracker->consumption(); });
_last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024);
@@ -105,7 +104,7 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest&
params) {
auto channel_mem_tracker = std::make_unique<MemTracker>(
fmt::format("LoadChannel#senderIp={}#loadID={}",
params.sender_ip(),
load_id.to_string()),
- nullptr,
ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker_set());
+ ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
#else
auto channel_mem_tracker =
std::make_unique<MemTracker>(fmt::format(
"LoadChannel#senderIp={}#loadID={}", params.sender_ip(),
load_id.to_string()));
diff --git a/be/src/runtime/load_channel_mgr.h
b/be/src/runtime/load_channel_mgr.h
index 967617e9bc..a825b74bca 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -58,7 +58,7 @@ public:
std::lock_guard<std::mutex> l(_lock);
_refresh_mem_tracker_without_lock();
}
- MemTrackerLimiter* mem_tracker_set() { return _mem_tracker_set.get(); }
+ MemTrackerLimiter* mem_tracker() { return _mem_tracker.get(); }
private:
template <typename Request>
@@ -74,11 +74,12 @@ private:
// lock should be held when calling this method
void _refresh_mem_tracker_without_lock() {
- int64_t mem_usage = 0;
+ _mem_usage = 0;
for (auto& kv : _load_channels) {
- mem_usage += kv.second->mem_consumption();
+ _mem_usage += kv.second->mem_consumption();
}
- _mem_tracker->set_consumption(mem_usage);
+ THREAD_MEM_TRACKER_TRANSFER_TO(_mem_usage -
_mem_tracker->consumption(),
+ _mem_tracker.get());
}
protected:
@@ -89,9 +90,8 @@ protected:
Cache* _last_success_channel = nullptr;
// check the total load channel mem consumption of this Backend
- std::unique_ptr<MemTracker> _mem_tracker;
- // Associate load channel tracker and memtable tracker, avoid default
association to Orphan tracker.
- std::unique_ptr<MemTrackerLimiter> _mem_tracker_set;
+ int64_t _mem_usage = 0;
+ std::unique_ptr<MemTrackerLimiter> _mem_tracker;
int64_t _load_hard_mem_limit = -1;
int64_t _load_soft_mem_limit = -1;
bool _soft_reduce_mem_in_progress = false;
diff --git a/be/src/runtime/memory/mem_tracker.cpp
b/be/src/runtime/memory/mem_tracker.cpp
index 7d1c3de7c1..1888f366a5 100644
--- a/be/src/runtime/memory/mem_tracker.cpp
+++ b/be/src/runtime/memory/mem_tracker.cpp
@@ -40,8 +40,8 @@ struct TrackerGroup {
static std::vector<TrackerGroup> mem_tracker_pool(1000);
MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile,
MemTrackerLimiter* parent,
- const std::string& profile_counter_name, bool
only_track_alloc)
- : _label(label), _only_track_alloc(only_track_alloc) {
+ const std::string& profile_counter_name)
+ : _label(label) {
if (profile == nullptr) {
_consumption =
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES);
} else {
@@ -56,7 +56,15 @@ MemTracker::MemTracker(const std::string& label,
RuntimeProfile* profile, MemTra
// release().
_consumption =
profile->AddSharedHighWaterMarkCounter(profile_counter_name, TUnit::BYTES);
}
+ bind_parent(parent);
+}
+
+MemTracker::MemTracker(const std::string& label, MemTrackerLimiter* parent) :
_label(label) {
+ _consumption =
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES);
+ bind_parent(parent);
+}
+void MemTracker::bind_parent(MemTrackerLimiter* parent) {
if (parent) {
_parent_label = parent->label();
_parent_group_num = parent->group_num();
@@ -95,7 +103,7 @@ void
MemTracker::make_group_snapshot(std::vector<MemTracker::Snapshot>* snapshot
int64_t group_num, std::string
parent_label) {
std::lock_guard<std::mutex> l(mem_tracker_pool[group_num].group_lock);
for (auto tracker : mem_tracker_pool[group_num].trackers) {
- if (tracker->parent_label() == parent_label) {
+ if (tracker->parent_label() == parent_label && tracker->consumption()
!= 0) {
snapshots->push_back(tracker->make_snapshot());
}
}
diff --git a/be/src/runtime/memory/mem_tracker.h
b/be/src/runtime/memory/mem_tracker.h
index 46d1ec24d4..509362397a 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -44,10 +44,9 @@ public:
};
// Creates and adds the tracker to the mem_tracker_pool.
- MemTracker(const std::string& label, RuntimeProfile* profile = nullptr,
- MemTrackerLimiter* parent = nullptr,
- const std::string& profile_counter_name = "PeakMemoryUsage",
- bool only_track_alloc = false);
+ MemTracker(const std::string& label, RuntimeProfile* profile,
MemTrackerLimiter* parent,
+ const std::string& profile_counter_name);
+ MemTracker(const std::string& label, MemTrackerLimiter* parent = nullptr);
// For MemTrackerLimiter
MemTracker() { _parent_group_num = -1; }
@@ -61,13 +60,13 @@ public:
public:
const std::string& label() const { return _label; }
const std::string& parent_label() const { return _parent_label; }
+ const std::string& set_parent_label() const { return _parent_label; }
// Returns the memory consumed in bytes.
int64_t consumption() const { return _consumption->current_value(); }
int64_t peak_consumption() const { return _consumption->value(); }
void consume(int64_t bytes) {
if (bytes == 0) return;
- if (bytes < 0 && _only_track_alloc) return;
_consumption->add(bytes);
}
void release(int64_t bytes) { consume(-bytes); }
@@ -89,6 +88,8 @@ public:
}
protected:
+ void bind_parent(MemTrackerLimiter* parent);
+
// label used in the make snapshot, not guaranteed unique.
std::string _label;
@@ -96,10 +97,9 @@ protected:
// Tracker is located in group num in mem_tracker_pool
int64_t _parent_group_num = 0;
+ // Use _parent_label to correlate with parent limiter tracker.
std::string _parent_label = "-";
- bool _only_track_alloc = false;
-
// Iterator into mem_tracker_pool for this object. Stored to have O(1)
remove.
std::list<MemTracker*>::iterator _tracker_group_it;
};
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 05e53baaf9..87ef52fbed 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -97,9 +97,8 @@ MemTracker::Snapshot MemTrackerLimiter::make_snapshot() const
{
void MemTrackerLimiter::refresh_global_counter() {
std::unordered_map<Type, int64_t> type_mem_sum = {
- {Type::GLOBAL, 0}, {Type::QUERY, 0}, {Type::LOAD, 0},
- {Type::COMPACTION, 0}, {Type::SCHEMA_CHANGE, 0}, {Type::CLONE, 0},
- {Type::BATCHLOAD, 0}, {Type::CONSISTENCY, 0}};
+ {Type::GLOBAL, 0}, {Type::QUERY, 0}, {Type::LOAD, 0},
{Type::COMPACTION, 0},
+ {Type::SCHEMA_CHANGE, 0}, {Type::CLONE, 0}}; // No need refresh
Type::EXPERIMENTAL
for (unsigned i = 0; i < mem_tracker_limiter_pool.size(); ++i) {
std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
@@ -327,6 +326,11 @@ int64_t
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem) {
}
}
+ // Minor gc does not cancel when there is only one query. full gc conver.
+ if (query_consumption.size() <= 1) {
+ return 0;
+ }
+
std::priority_queue<std::pair<int64_t, std::string>> max_pq;
// Min-heap to Max-heap.
while (!min_pq.empty()) {
@@ -341,15 +345,16 @@ int64_t
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem) {
int64_t query_mem = query_consumption[max_pq.top().second];
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
cancelled_queryid,
PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
- fmt::format("Process has no memory available, cancel top
memory usage query: "
+ fmt::format("Process has less memory, cancel top memory
overcommit query: "
"query memory tracker <{}> consumption {}, backend
{} "
- "process memory used {} exceed limit {} or sys mem
available {} "
- "less than low water mark {}. Execute again after
enough memory, "
+ "process memory used {} exceed soft limit {} or
sys mem available {} "
+ "less than warning water mark {}. Execute again
after enough memory, "
"details see be.INFO.",
max_pq.top().second, print_bytes(query_mem),
BackendOptions::get_localhost(),
PerfCounters::get_vm_rss_str(),
- MemInfo::mem_limit_str(),
MemInfo::sys_mem_available_str(),
-
print_bytes(MemInfo::sys_mem_available_low_water_mark())));
+ print_bytes(MemInfo::soft_mem_limit()),
+ MemInfo::sys_mem_available_str(),
+
print_bytes(MemInfo::sys_mem_available_warning_water_mark())));
usage_strings.push_back(fmt::format("{} memory usage {} Bytes,
overcommit ratio: {}",
max_pq.top().second, query_mem,
max_pq.top().first));
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index 7696908dab..bb346a2a3b 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -46,8 +46,8 @@ public:
COMPACTION = 3, // Count the memory consumption of all Base and
Cumulative tasks.
SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange
tasks.
CLONE = 5, // Count the memory consumption of all EngineCloneTask.
Note: Memory that does not contain make/release snapshots.
- BATCHLOAD = 6, // Count the memory consumption of all
EngineBatchLoadTask.
- CONSISTENCY = 7 // Count the memory consumption of all
EngineChecksumTask.
+ EXPERIMENTAL =
+ 6 // Experimental memory statistics, usually inaccurate, used
for debugging, and expect to add other types in the future.
};
inline static std::unordered_map<Type,
std::shared_ptr<RuntimeProfile::HighWaterMarkCounter>>
@@ -63,14 +63,11 @@ public:
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)},
{Type::CLONE,
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)},
- {Type::BATCHLOAD,
-
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)},
- {Type::CONSISTENCY,
+ {Type::EXPERIMENTAL,
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)}};
- inline static const std::string TypeString[] = {"global", "query",
"load",
- "compaction",
"schema_change", "clone",
- "batch_load",
"consistency"};
+ inline static const std::string TypeString[] = {
+ "global", "query", "load", "compaction", "schema_change", "clone",
"experimental"};
public:
// byte_limit equal to -1 means no consumption limit, only participate in
process memory statistics.
@@ -161,7 +158,7 @@ public:
static std::string process_mem_log_str() {
return fmt::format(
- "physical memory {}, process memory used {} limit {}, sys mem
available {} low "
+ "OS physical memory {}, process memory used {} limit {}, sys
mem available {} low "
"water mark {}, refresh interval memory growth {} B",
PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES),
PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index f36cefaf51..748191b177 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -100,7 +100,6 @@ Status PlanFragmentExecutor::prepare(const
TExecPlanFragmentParams& request,
_runtime_state->set_tracer(std::move(tracer));
SCOPED_ATTACH_TASK(_runtime_state.get());
- _runtime_state->init_scanner_mem_trackers();
_runtime_state->runtime_filter_mgr()->init();
_runtime_state->set_be_number(request.backend_num);
if (request.__isset.backend_id) {
diff --git a/be/src/runtime/runtime_filter_mgr.cpp
b/be/src/runtime/runtime_filter_mgr.cpp
index bf0bfd0d9d..4d5478a596 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -45,7 +45,8 @@ RuntimeFilterMgr::~RuntimeFilterMgr() {}
Status RuntimeFilterMgr::init() {
DCHECK(_state->query_mem_tracker() != nullptr);
- _tracker = std::make_unique<MemTracker>("RuntimeFilterMgr");
+ _tracker = std::make_unique<MemTracker>("RuntimeFilterMgr",
+
ExecEnv::GetInstance()->experimental_mem_tracker());
return Status::OK();
}
@@ -161,7 +162,8 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId
query_id, UniqueId frag
const TQueryOptions&
query_options) {
_query_id = query_id;
_fragment_instance_id = fragment_instance_id;
- _mem_tracker =
std::make_shared<MemTracker>("RuntimeFilterMergeControllerEntity");
+ _mem_tracker =
std::make_shared<MemTracker>("RuntimeFilterMergeControllerEntity",
+
ExecEnv::GetInstance()->experimental_mem_tracker());
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter)
{
int filter_id = filterid_to_desc.first;
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 044a619e03..c3bd7d0c3f 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -216,8 +216,6 @@ Status RuntimeState::init(const TUniqueId&
fragment_instance_id, const TQueryOpt
Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) {
_query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::QUERY, fmt::format("TestQuery#Id={}",
print_id(query_id)));
- _scanner_mem_tracker =
- std::make_shared<MemTracker>(fmt::format("TestScanner#QueryId={}",
print_id(query_id)));
return Status::OK();
}
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index ee22760ab1..d72631de92 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -75,11 +75,6 @@ public:
Status init(const TUniqueId& fragment_instance_id, const TQueryOptions&
query_options,
const TQueryGlobals& query_globals, ExecEnv* exec_env);
- // after SCOPED_ATTACH_TASK;
- void init_scanner_mem_trackers() {
- _scanner_mem_tracker = std::make_shared<MemTracker>(
- fmt::format("Scanner#QueryId={}", print_id(_query_id)));
- }
// for ut and non-query.
Status init_mem_trackers(const TUniqueId& query_id = TUniqueId());
@@ -111,7 +106,6 @@ public:
const TUniqueId& fragment_instance_id() const { return
_fragment_instance_id; }
ExecEnv* exec_env() { return _exec_env; }
std::shared_ptr<MemTrackerLimiter> query_mem_tracker() { return
_query_mem_tracker; }
- std::shared_ptr<MemTracker> scanner_mem_tracker() { return
_scanner_mem_tracker; }
ThreadResourceMgr::ResourcePool* resource_pool() { return _resource_pool; }
void set_fragment_root_id(PlanNodeId id) {
@@ -431,8 +425,6 @@ private:
static const int DEFAULT_BATCH_SIZE = 2048;
std::shared_ptr<MemTrackerLimiter> _query_mem_tracker;
- // Count the memory consumption of Scanner
- std::shared_ptr<MemTracker> _scanner_mem_tracker;
// put runtime state before _obj_pool, so that it will be deconstructed
after
// _obj_pool. Because some of object in _obj_pool will use profile when
deconstructing.
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index c6e6ecea45..fd87c56f98 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -356,7 +356,6 @@ Status HashJoinNode::init(const TPlanNode& tnode,
RuntimeState* state) {
Status HashJoinNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(VJoinNodeBase::prepare(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
auto* memory_usage = runtime_profile()->create_child("MemoryUsage", true,
true);
runtime_profile()->add_child(memory_usage, false, nullptr);
@@ -662,7 +661,6 @@ Status HashJoinNode::open(RuntimeState* state) {
START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::open");
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(VJoinNodeBase::open(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
RETURN_IF_CANCELLED(state);
return Status::OK();
}
diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp
b/be/src/vec/exec/join/vjoin_node_base.cpp
index 5e5c24fcfc..e186f15d8c 100644
--- a/be/src/vec/exec/join/vjoin_node_base.cpp
+++ b/be/src/vec/exec/join/vjoin_node_base.cpp
@@ -176,7 +176,6 @@ Status VJoinNodeBase::init(const TPlanNode& tnode,
RuntimeState* state) {
Status VJoinNodeBase::open(RuntimeState* state) {
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VJoinNodeBase::open");
RETURN_IF_ERROR(ExecNode::open(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
RETURN_IF_CANCELLED(state);
std::promise<Status> thread_status;
@@ -220,7 +219,6 @@ void VJoinNodeBase::_reset_tuple_is_null_column() {
void VJoinNodeBase::_probe_side_open_thread(RuntimeState* state,
std::promise<Status>* status) {
START_AND_SCOPE_SPAN(state->get_tracer(), span,
"VJoinNodeBase::_hash_table_build_thread");
SCOPED_ATTACH_TASK(state);
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh_shared());
status->set_value(child(0)->open(state));
}
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 39ae99a36e..00ed59bcbd 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -103,7 +103,6 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode,
RuntimeState* state) {
Status VNestedLoopJoinNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(VJoinNodeBase::prepare(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
_build_rows_counter = ADD_COUNTER(runtime_profile(), "BuildRows",
TUnit::UNIT);
@@ -237,7 +236,6 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state,
Block* block, bool* eo
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_TIMER(_probe_timer);
RETURN_IF_CANCELLED(state);
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
while (need_more_input_data()) {
RETURN_IF_ERROR(_fresh_left_block(state));
@@ -605,7 +603,6 @@ Status VNestedLoopJoinNode::open(RuntimeState* state) {
START_AND_SCOPE_SPAN(state->get_tracer(), span,
"VNestedLoopJoinNode::open")
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(VJoinNodeBase::open(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
RETURN_IF_CANCELLED(state);
// We can close the right child to release its resources because its input
has been
// fully consumed.
diff --git a/be/src/vec/exec/scan/new_es_scan_node.cpp
b/be/src/vec/exec/scan/new_es_scan_node.cpp
index 61f7f4022c..5b22c2ef19 100644
--- a/be/src/vec/exec/scan/new_es_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_es_scan_node.cpp
@@ -76,7 +76,6 @@ Status NewEsScanNode::init(const TPlanNode& tnode,
RuntimeState* state) {
Status NewEsScanNode::prepare(RuntimeState* state) {
VLOG_CRITICAL << NEW_SCAN_NODE_TYPE << "::prepare";
RETURN_IF_ERROR(VScanNode::prepare(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
if (_tuple_desc == nullptr) {
diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
index b328151a92..19fd93b9d3 100644
--- a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
@@ -37,7 +37,6 @@ std::string NewJdbcScanNode::get_name() {
Status NewJdbcScanNode::prepare(RuntimeState* state) {
VLOG_CRITICAL << "VNewJdbcScanNode::Prepare";
RETURN_IF_ERROR(VScanNode::prepare(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.cpp
b/be/src/vec/exec/scan/new_odbc_scan_node.cpp
index 2bd6ad4c34..dbbf57a120 100644
--- a/be/src/vec/exec/scan/new_odbc_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_odbc_scan_node.cpp
@@ -38,7 +38,6 @@ std::string NewOdbcScanNode::get_name() {
Status NewOdbcScanNode::prepare(RuntimeState* state) {
VLOG_CRITICAL << NEW_SCAN_NODE_TYPE << "::prepare";
RETURN_IF_ERROR(VScanNode::prepare(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index a84443d38d..c6df1722cb 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -46,7 +46,6 @@ Status
NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics) {
}
Status NewOlapScanNode::prepare(RuntimeState* state) {
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
RETURN_IF_ERROR(VScanNode::prepare(state));
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index f5e270c606..fe7356d0e8 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -191,7 +191,6 @@ void ScannerScheduler::_schedule_scanners(ScannerContext*
ctx) {
void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
ScannerContext* ctx,
VScanner* scanner) {
SCOPED_ATTACH_TASK(scanner->runtime_state());
-
SCOPED_CONSUME_MEM_TRACKER(scanner->runtime_state()->scanner_mem_tracker());
Thread::set_self_name("_scanner_scan");
scanner->update_wait_worker_timer();
scanner->start_scan_cpu_timer();
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index 78e7d91bd3..ebd4132281 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -75,7 +75,6 @@ Status VScanNode::init(const TPlanNode& tnode, RuntimeState*
state) {
Status VScanNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
// init profile for runtime filter
for (auto& rf_ctx : _runtime_filter_ctxs) {
@@ -85,7 +84,6 @@ Status VScanNode::prepare(RuntimeState* state) {
}
Status VScanNode::open(RuntimeState* state) {
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::open");
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_CANCELLED(state);
@@ -96,7 +94,6 @@ Status VScanNode::alloc_resource(RuntimeState* state) {
if (_opened) {
return Status::OK();
}
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
_input_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_input_tuple_id);
_output_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
START_AND_SCOPE_SPAN(state->get_tracer(), span,
"VScanNode::alloc_resource");
@@ -125,7 +122,6 @@ Status VScanNode::get_next(RuntimeState* state,
vectorized::Block* block, bool*
INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span,
"VScanNode::get_next");
SCOPED_TIMER(_get_next_timer);
SCOPED_TIMER(_runtime_profile->total_time_counter());
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
// in inverted index apply logic, in order to optimize query performance,
// we built some temporary columns into block, these columns only used in
scan node level,
// remove them when query leave scan node to avoid other nodes use
block->columns() to make a wrong decision
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index 89504b20ef..6b33cba1d3 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -446,7 +446,6 @@ Status AggregationNode::prepare_profile(RuntimeState*
state) {
}
Status AggregationNode::prepare(RuntimeState* state) {
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
@@ -456,7 +455,6 @@ Status AggregationNode::prepare(RuntimeState* state) {
Status AggregationNode::alloc_resource(doris::RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::alloc_resource(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state));
@@ -534,7 +532,6 @@ Status AggregationNode::get_next(RuntimeState* state,
Block* block, bool* eos) {
_children[0]->get_next_span(), _child_eos);
};
{
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
if (_preagg_block.rows() != 0) {
RETURN_IF_ERROR(do_pre_agg(&_preagg_block, block));
} else {
@@ -542,7 +539,6 @@ Status AggregationNode::get_next(RuntimeState* state,
Block* block, bool* eos) {
}
}
} else {
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
RETURN_IF_ERROR(pull(state, block, eos));
}
return Status::OK();
@@ -758,7 +754,7 @@ Status AggregationNode::_merge_without_key(Block* block) {
void AggregationNode::_update_memusage_without_key() {
auto arena_memory_usage = _agg_arena_pool->size() -
_mem_usage_record.used_in_arena;
- mem_tracker_held()->consume(arena_memory_usage);
+ mem_tracker()->consume(arena_memory_usage);
_serialize_key_arena_memory_usage->add(arena_memory_usage);
_mem_usage_record.used_in_arena = _agg_arena_pool->size();
}
@@ -1370,9 +1366,9 @@ void
AggregationNode::_update_memusage_with_serialized_key() {
auto arena_memory_usage = _agg_arena_pool->size() +
_aggregate_data_container->memory_usage() -
_mem_usage_record.used_in_arena;
- mem_tracker_held()->consume(arena_memory_usage);
- mem_tracker_held()->consume(data.get_buffer_size_in_bytes() -
- _mem_usage_record.used_in_state);
+ mem_tracker()->consume(arena_memory_usage);
+ mem_tracker()->consume(data.get_buffer_size_in_bytes() -
+ _mem_usage_record.used_in_state);
_serialize_key_arena_memory_usage->add(arena_memory_usage);
COUNTER_UPDATE(_hash_table_memory_usage,
data.get_buffer_size_in_bytes() -
_mem_usage_record.used_in_state);
@@ -1399,7 +1395,7 @@ void AggregationNode::_close_with_serialized_key() {
}
void AggregationNode::release_tracker() {
- mem_tracker_held()->release(_mem_usage_record.used_in_state +
_mem_usage_record.used_in_arena);
+ mem_tracker()->release(_mem_usage_record.used_in_state +
_mem_usage_record.used_in_arena);
}
void AggregationNode::_release_mem() {
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp
b/be/src/vec/exec/vanalytic_eval_node.cpp
index b2235de9ab..237d583cc0 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -140,7 +140,6 @@ Status VAnalyticEvalNode::init(const TPlanNode& tnode,
RuntimeState* state) {
}
Status VAnalyticEvalNode::prepare(RuntimeState* state) {
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
DCHECK(child(0)->row_desc().is_prefix_of(_row_descriptor));
@@ -211,7 +210,6 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) {
Status VAnalyticEvalNode::open(RuntimeState* state) {
RETURN_IF_ERROR(alloc_resource(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
RETURN_IF_ERROR(child(0)->open(state));
return Status::OK();
}
@@ -226,13 +224,11 @@ Status VAnalyticEvalNode::close(RuntimeState* state) {
Status VAnalyticEvalNode::alloc_resource(RuntimeState* state) {
{
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
START_AND_SCOPE_SPAN(state->get_tracer(), span,
"VAnalyticEvalNode::open");
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::alloc_resource(state));
RETURN_IF_CANCELLED(state);
}
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
RETURN_IF_ERROR(VExpr::open(_partition_by_eq_expr_ctxs, state));
RETURN_IF_ERROR(VExpr::open(_order_by_eq_expr_ctxs, state));
for (size_t i = 0; i < _agg_functions_size; ++i) {
@@ -322,14 +318,12 @@ Status VAnalyticEvalNode::get_next(RuntimeState* state,
vectorized::Block* block
return Status::OK();
}
size_t current_block_rows = _input_blocks[_output_block_index].rows();
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
RETURN_IF_ERROR(_executor.get_next(current_block_rows));
if (_window_end_position == current_block_rows) {
break;
}
}
RETURN_IF_ERROR(_output_current_block(block));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block,
block->columns()));
reached_limit(block, eos);
return Status::OK();
@@ -393,8 +387,6 @@ Status
VAnalyticEvalNode::_consumed_block_and_init_partition(RuntimeState* state
found_partition_end = _get_partition_by_end(); //claculate new
partition end
}
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
-
if (_input_eos && _input_total_rows == 0) {
*eos = true;
return Status::OK();
@@ -528,8 +520,6 @@ Status VAnalyticEvalNode::sink(doris::RuntimeState*
/*state*/, vectorized::Block
return Status::OK();
}
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
-
input_block_first_row_positions.emplace_back(_input_total_rows);
size_t block_rows = input_block->rows();
_input_total_rows += block_rows;
@@ -566,7 +556,7 @@ Status VAnalyticEvalNode::sink(doris::RuntimeState*
/*state*/, vectorized::Block
_ordey_by_column_idxs[i] = result_col_id;
}
- mem_tracker_held()->consume(input_block->allocated_bytes());
+ mem_tracker()->consume(input_block->allocated_bytes());
_blocks_memory_usage->add(input_block->allocated_bytes());
//TODO: if need improvement, the is a tips to maintain a free queue,
@@ -627,11 +617,9 @@ void VAnalyticEvalNode::_insert_result_info(int64_t
current_block_rows) {
}
Status VAnalyticEvalNode::_output_current_block(Block* block) {
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
-
block->swap(std::move(_input_blocks[_output_block_index]));
_blocks_memory_usage->add(-block->allocated_bytes());
- mem_tracker_held()->consume(-block->allocated_bytes());
+ mem_tracker()->consume(-block->allocated_bytes());
if (_origin_cols.size() < block->columns()) {
block->erase_not_in(_origin_cols);
}
diff --git a/be/src/vec/exec/vbroker_scan_node.cpp
b/be/src/vec/exec/vbroker_scan_node.cpp
index ac96f97965..a94b73592a 100644
--- a/be/src/vec/exec/vbroker_scan_node.cpp
+++ b/be/src/vec/exec/vbroker_scan_node.cpp
@@ -59,7 +59,6 @@ Status VBrokerScanNode::init(const TPlanNode& tnode,
RuntimeState* state) {
Status VBrokerScanNode::prepare(RuntimeState* state) {
VLOG_QUERY << "VBrokerScanNode prepare";
RETURN_IF_ERROR(ScanNode::prepare(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
// get tuple desc
_runtime_state = state;
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
@@ -85,7 +84,6 @@ Status VBrokerScanNode::open(RuntimeState* state) {
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VBrokerScanNode::open");
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::open(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(start_scanners());
@@ -109,7 +107,6 @@ Status VBrokerScanNode::start_scanners() {
Status VBrokerScanNode::get_next(RuntimeState* state, vectorized::Block*
block, bool* eos) {
INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span,
"VBrokerScanNode::get_next");
SCOPED_TIMER(_runtime_profile->total_time_counter());
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
// check if CANCELLED.
if (state->is_cancelled()) {
std::unique_lock<std::mutex> l(_batch_queue_lock);
@@ -271,7 +268,6 @@ Status VBrokerScanNode::scanner_scan(const
TBrokerScanRange& scan_range, Scanner
void VBrokerScanNode::scanner_worker(int start_idx, int length) {
START_AND_SCOPE_SPAN(_runtime_state->get_tracer(), span,
"VBrokerScanNode::scanner_worker");
SCOPED_ATTACH_TASK(_runtime_state);
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh_shared());
Thread::set_self_name("vbroker_scanner");
Status status = Status::OK();
ScannerCounter counter;
diff --git a/be/src/vec/exec/vexchange_node.cpp
b/be/src/vec/exec/vexchange_node.cpp
index 272116cbe1..069caaf635 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -54,7 +54,6 @@ Status VExchangeNode::init(const TPlanNode& tnode,
RuntimeState* state) {
Status VExchangeNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
DCHECK_GT(_num_senders, 0);
_sub_plan_query_statistics_recvr.reset(new QueryStatisticsRecvr());
_stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
@@ -83,7 +82,6 @@ Status VExchangeNode::alloc_resource(RuntimeState* state) {
Status VExchangeNode::open(RuntimeState* state) {
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VExchangeNode::open");
SCOPED_TIMER(_runtime_profile->total_time_counter());
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
RETURN_IF_ERROR(ExecNode::open(state));
return Status::OK();
@@ -92,7 +90,6 @@ Status VExchangeNode::open(RuntimeState* state) {
Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) {
INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span,
"VExchangeNode::get_next");
SCOPED_TIMER(runtime_profile()->total_time_counter());
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
if (_is_merging && state->enable_pipeline_exec() && !_is_ready) {
RETURN_IF_ERROR(_stream_recvr->create_merger(_vsort_exec_exprs.lhs_ordering_expr_ctxs(),
_is_asc_order,
_nulls_first,
diff --git a/be/src/vec/exec/vmysql_scan_node.cpp
b/be/src/vec/exec/vmysql_scan_node.cpp
index 3fb03bd19c..dcc7593ee9 100644
--- a/be/src/vec/exec/vmysql_scan_node.cpp
+++ b/be/src/vec/exec/vmysql_scan_node.cpp
@@ -49,7 +49,6 @@ Status VMysqlScanNode::prepare(RuntimeState* state) {
}
RETURN_IF_ERROR(ScanNode::prepare(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
// get tuple desc
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
@@ -103,7 +102,6 @@ Status VMysqlScanNode::open(RuntimeState* state) {
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VMysqlScanNode::open");
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::open(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
VLOG_CRITICAL << "MysqlScanNode::Open";
if (!_is_init) {
diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp
index 8a090ad0d6..c3fe0e8d0f 100644
--- a/be/src/vec/exec/vrepeat_node.cpp
+++ b/be/src/vec/exec/vrepeat_node.cpp
@@ -44,7 +44,6 @@ Status VRepeatNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
_output_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
if (_output_tuple_desc == nullptr) {
return Status::InternalError("Failed to get tuple descriptor.");
@@ -72,7 +71,6 @@ Status VRepeatNode::alloc_resource(RuntimeState* state) {
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VRepeatNode::open");
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::alloc_resource(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
RETURN_IF_ERROR(VExpr::open(_expr_ctxs, state));
return Status::OK();
}
diff --git a/be/src/vec/exec/vschema_scan_node.cpp
b/be/src/vec/exec/vschema_scan_node.cpp
index 9e3e6a1c42..65588eb11b 100644
--- a/be/src/vec/exec/vschema_scan_node.cpp
+++ b/be/src/vec/exec/vschema_scan_node.cpp
@@ -122,7 +122,6 @@ Status VSchemaScanNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(ExecNode::open(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
if (_scanner_param.user) {
TSetSessionParams param;
@@ -145,7 +144,6 @@ Status VSchemaScanNode::prepare(RuntimeState* state) {
}
START_AND_SCOPE_SPAN(state->get_tracer(), span,
"VSchemaScanNode::prepare");
RETURN_IF_ERROR(ScanNode::prepare(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
// new one mem pool
_tuple_pool.reset(new (std::nothrow) MemPool());
diff --git a/be/src/vec/exec/vset_operation_node.cpp
b/be/src/vec/exec/vset_operation_node.cpp
index 9ad65dbf1c..253d360569 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -193,7 +193,6 @@ Status VSetOperationNode<is_intersect>::open(RuntimeState*
state) {
START_AND_SCOPE_SPAN(state->get_tracer(), span,
"VSetOperationNode<is_intersect>::open");
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::open(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
// TODO: build the hash table in a thread to open other children
asynchronously.
RETURN_IF_ERROR(hash_table_build(state));
@@ -234,7 +233,6 @@ template <bool is_intersect>
Status VSetOperationNode<is_intersect>::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
_probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
_pull_timer = ADD_TIMER(runtime_profile(), "PullTime");
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index 4ebdd05506..fd7c88af75 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -87,7 +87,6 @@ Status VSortNode::init(const TPlanNode& tnode, RuntimeState*
state) {
}
Status VSortNode::prepare(RuntimeState* state) {
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
_runtime_profile->add_info_string("TOP-N", _limit == -1 ? "false" :
"true");
@@ -102,7 +101,6 @@ Status VSortNode::prepare(RuntimeState* state) {
Status VSortNode::alloc_resource(doris::RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::alloc_resource(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->check_query_state("vsort, while open."));
@@ -161,13 +159,12 @@ Status VSortNode::open(RuntimeState* state) {
_children[0], std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3)),
child(0)->get_next_span(), eos);
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
RETURN_IF_ERROR(sink(state, upstream_block.get(), eos));
} while (!eos);
child(0)->close(state);
- mem_tracker_held()->consume(_sorter->data_size());
+ mem_tracker()->consume(_sorter->data_size());
COUNTER_UPDATE(_sort_blocks_memory_usage, _sorter->data_size());
return Status::OK();
@@ -183,7 +180,6 @@ Status VSortNode::pull(doris::RuntimeState* state,
vectorized::Block* output_blo
}
Status VSortNode::get_next(RuntimeState* state, Block* block, bool* eos) {
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span,
"VSortNode::get_next");
SCOPED_TIMER(_runtime_profile->total_time_counter());
diff --git a/be/src/vec/exec/vtable_function_node.cpp
b/be/src/vec/exec/vtable_function_node.cpp
index c3e9f2a829..f7fe4f71d6 100644
--- a/be/src/vec/exec/vtable_function_node.cpp
+++ b/be/src/vec/exec/vtable_function_node.cpp
@@ -83,7 +83,6 @@ bool VTableFunctionNode::_is_inner_and_empty() {
Status VTableFunctionNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
_num_rows_filtered_counter = ADD_COUNTER(_runtime_profile, "RowsFiltered",
TUnit::UNIT);
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 3efb84b4c0..f58e3d1652 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -308,8 +308,9 @@ VDataStreamRecvr::VDataStreamRecvr(
_sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr),
_enable_pipeline(state->enable_pipeline_exec()) {
// DataStreamRecvr may be destructed after the instance execution thread
ends.
- _mem_tracker = std::make_unique<MemTracker>(
- "VDataStreamRecvr:" + print_id(_fragment_instance_id), _profile);
+ _mem_tracker =
+ std::make_unique<MemTracker>("VDataStreamRecvr:" +
print_id(_fragment_instance_id),
+ _profile, nullptr, "PeakMemoryUsage");
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
// Create one queue per sender if is_merging is true.
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index 1d36902561..8b87ff4840 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -421,7 +421,8 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
_profile = _pool->add(new RuntimeProfile(title));
SCOPED_TIMER(_profile->total_time_counter());
_mem_tracker = std::make_unique<MemTracker>(
- "VDataStreamSender:" + print_id(state->fragment_instance_id()),
_profile);
+ "VDataStreamSender:" + print_id(state->fragment_instance_id()),
_profile, nullptr,
+ "PeakMemoryUsage");
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
if (_part_type == TPartitionType::UNPARTITIONED || _part_type ==
TPartitionType::RANDOM) {
diff --git a/be/test/testutil/run_all_tests.cpp
b/be/test/testutil/run_all_tests.cpp
index 99a338a5aa..cd6499831f 100644
--- a/be/test/testutil/run_all_tests.cpp
+++ b/be/test/testutil/run_all_tests.cpp
@@ -28,10 +28,7 @@
#include "util/mem_info.h"
int main(int argc, char** argv) {
- std::shared_ptr<doris::MemTrackerLimiter> orphan_mem_tracker =
-
std::make_shared<doris::MemTrackerLimiter>(doris::MemTrackerLimiter::Type::GLOBAL,
- "Orphan");
- doris::ExecEnv::GetInstance()->set_orphan_mem_tracker(orphan_mem_tracker);
+ doris::ExecEnv::GetInstance()->init_mem_tracker();
doris::thread_context()->thread_mem_tracker_mgr->init();
doris::TabletSchemaCache::create_global_schema_cache();
doris::StoragePageCache::create_global_cache(1 << 30, 10);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]