This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 1d52edc1ced [fix](counters) fix MemoryUsage and PeakMemoryUsage 
counters of some operators (#41602) (#41886)
1d52edc1ced is described below

commit 1d52edc1ced3eb14b209f906564874a463e57f7f
Author: TengJianPing <[email protected]>
AuthorDate: Mon Jan 13 17:47:25 2025 +0800

    [fix](counters) fix MemoryUsage and PeakMemoryUsage counters of some 
operators (#41602) (#41886)
    
    BP #41602
---
 be/src/pipeline/dependency.cpp                     |  2 +-
 be/src/pipeline/dependency.h                       | 13 ++--
 be/src/pipeline/exec/aggregation_sink_operator.cpp | 46 +++++-------
 be/src/pipeline/exec/aggregation_sink_operator.h   |  2 +-
 .../pipeline/exec/aggregation_source_operator.cpp  |  4 --
 be/src/pipeline/exec/analytic_sink_operator.cpp    |  5 +-
 be/src/pipeline/exec/analytic_sink_operator.h      |  1 +
 be/src/pipeline/exec/analytic_source_operator.cpp  |  1 -
 be/src/pipeline/exec/assert_num_rows_operator.cpp  |  2 -
 be/src/pipeline/exec/cache_source_operator.cpp     |  1 -
 .../distinct_streaming_aggregation_operator.cpp    |  1 -
 be/src/pipeline/exec/exchange_sink_buffer.cpp      | 16 +++++
 be/src/pipeline/exec/exchange_sink_operator.cpp    | 42 +++++++----
 be/src/pipeline/exec/exchange_sink_operator.h      |  1 -
 be/src/pipeline/exec/exchange_source_operator.cpp  |  6 +-
 .../exec/group_commit_block_sink_operator.cpp      |  3 -
 be/src/pipeline/exec/hashjoin_build_sink.cpp       | 84 +++++++++++-----------
 be/src/pipeline/exec/hashjoin_build_sink.h         |  2 -
 be/src/pipeline/exec/hashjoin_probe_operator.cpp   |  8 ++-
 .../pipeline/exec/join/process_hash_table_probe.h  |  1 -
 .../exec/join/process_hash_table_probe_impl.h      |  7 +-
 .../exec/multi_cast_data_stream_source.cpp         |  1 -
 be/src/pipeline/exec/operator.cpp                  | 40 ++++++-----
 be/src/pipeline/exec/operator.h                    | 17 ++---
 .../pipeline/exec/partition_sort_sink_operator.cpp |  4 +-
 .../exec/partition_sort_source_operator.cpp        |  2 -
 .../exec/partitioned_aggregation_sink_operator.cpp | 10 +--
 .../exec/partitioned_hash_join_probe_operator.cpp  |  3 +-
 .../exec/partitioned_hash_join_sink_operator.cpp   |  6 +-
 be/src/pipeline/exec/repeat_operator.cpp           |  1 -
 be/src/pipeline/exec/scan_operator.h               | 10 ++-
 be/src/pipeline/exec/sort_sink_operator.cpp        |  8 ++-
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  | 11 +--
 .../exec/streaming_aggregation_operator.cpp        | 57 +++++++--------
 .../pipeline/exec/streaming_aggregation_operator.h |  6 --
 .../local_exchange_source_operator.cpp             |  6 +-
 be/src/pipeline/local_exchange/local_exchanger.cpp |  3 +-
 be/src/vec/runtime/partitioner.cpp                 | 14 ++--
 be/src/vec/runtime/partitioner.h                   |  6 +-
 be/src/vec/runtime/vdata_stream_mgr.cpp            |  7 +-
 be/src/vec/runtime/vdata_stream_mgr.h              |  4 ++
 be/src/vec/runtime/vdata_stream_recvr.cpp          | 19 ++---
 be/src/vec/runtime/vdata_stream_recvr.h            |  7 +-
 be/src/vec/sink/vdata_stream_sender.cpp            |  2 -
 44 files changed, 246 insertions(+), 246 deletions(-)

diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 560efec94e1..93117fa71a0 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -199,7 +199,7 @@ void LocalExchangeSharedState::sub_running_source_operators(
 
 LocalExchangeSharedState::LocalExchangeSharedState(int num_instances) {
     source_deps.resize(num_instances, nullptr);
-    mem_trackers.resize(num_instances, nullptr);
+    mem_counters.resize(num_instances, nullptr);
 }
 
 vectorized::MutableColumns AggSharedState::_get_keys_hash_table() {
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index c689ec7ee6a..fea6d9cb7bb 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -324,11 +324,6 @@ public:
     vectorized::Sizes offsets_of_aggregate_states;
     std::vector<size_t> make_nullable_keys;
 
-    struct MemoryRecord {
-        int64_t used_in_arena {};
-        int64_t used_in_state {};
-    };
-    MemoryRecord mem_usage_record;
     bool agg_data_created_without_key = false;
     bool enable_spill = false;
     bool reach_limit = false;
@@ -826,7 +821,7 @@ public:
     LocalExchangeSharedState(int num_instances);
     ~LocalExchangeSharedState() override;
     std::unique_ptr<ExchangerBase> exchanger {};
-    std::vector<MemTracker*> mem_trackers;
+    std::vector<RuntimeProfile::Counter*> mem_counters;
     std::atomic<int64_t> mem_usage = 0;
     // We need to make sure to add mem_usage first and then enqueue, otherwise 
sub mem_usage may cause negative mem_usage during concurrent dequeue.
     std::mutex le_lock;
@@ -862,13 +857,15 @@ public:
     }
 
     void add_mem_usage(int channel_id, size_t delta, bool 
update_total_mem_usage = true) {
-        mem_trackers[channel_id]->consume(delta);
+        mem_counters[channel_id]->update(delta);
         if (update_total_mem_usage) {
             add_total_mem_usage(delta, channel_id);
         }
     }
 
-    void sub_mem_usage(int channel_id, size_t delta) { 
mem_trackers[channel_id]->release(delta); }
+    void sub_mem_usage(int channel_id, size_t delta) {
+        mem_counters[channel_id]->update(-(int64_t)delta);
+    }
 
     virtual void add_total_mem_usage(size_t delta, int channel_id) {
         if (mem_usage.fetch_add(delta) + delta > 
config::local_exchange_buffer_mem_limit) {
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 627dbf4ef41..f5caaf30bfc 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -57,10 +57,10 @@ Status AggSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
     _agg_data = Base::_shared_state->agg_data.get();
     _agg_arena_pool = Base::_shared_state->agg_arena_pool.get();
     _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", 
TUnit::UNIT);
-    _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), 
"HashTable",
-                                                            TUnit::BYTES, 
"MemoryUsage", 1);
-    _serialize_key_arena_memory_usage = 
Base::profile()->AddHighWaterMarkCounter(
-            "SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1);
+    _hash_table_memory_usage =
+            ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageHashTable", 
TUnit::BYTES, 1);
+    _serialize_key_arena_memory_usage = ADD_COUNTER_WITH_LEVEL(
+            Base::profile(), "MemoryUsageSerializeKeyArena", TUnit::BYTES, 1);
 
     _build_timer = ADD_TIMER(Base::profile(), "BuildTime");
     _merge_timer = ADD_TIMER(Base::profile(), "MergeTime");
@@ -223,24 +223,17 @@ void 
AggSinkLocalState::_update_memusage_with_serialized_key() {
                        },
                        [&](auto& agg_method) -> void {
                            auto& data = *agg_method.hash_table;
-                           auto arena_memory_usage =
+                           int64_t arena_memory_usage =
                                    _agg_arena_pool->size() +
-                                   
Base::_shared_state->aggregate_data_container->memory_usage() -
-                                   
Base::_shared_state->mem_usage_record.used_in_arena;
-                           Base::_mem_tracker->consume(arena_memory_usage);
-                           Base::_mem_tracker->consume(
-                                   data.get_buffer_size_in_bytes() -
-                                   
Base::_shared_state->mem_usage_record.used_in_state);
-                           
_serialize_key_arena_memory_usage->add(arena_memory_usage);
-                           COUNTER_UPDATE(
-                                   _hash_table_memory_usage,
-                                   data.get_buffer_size_in_bytes() -
-                                           
Base::_shared_state->mem_usage_record.used_in_state);
-                           Base::_shared_state->mem_usage_record.used_in_state 
=
-                                   data.get_buffer_size_in_bytes();
-                           Base::_shared_state->mem_usage_record.used_in_arena 
=
-                                   _agg_arena_pool->size() +
-                                   
Base::_shared_state->aggregate_data_container->memory_usage();
+                                   
_shared_state->aggregate_data_container->memory_usage();
+                           int64_t hash_table_memory_usage = 
data.get_buffer_size_in_bytes();
+
+                           COUNTER_SET(_memory_used_counter,
+                                       arena_memory_usage + 
hash_table_memory_usage);
+                           COUNTER_SET(_peak_memory_usage_counter, 
_memory_used_counter->value());
+
+                           COUNTER_SET(_serialize_key_arena_memory_usage, 
arena_memory_usage);
+                           COUNTER_SET(_hash_table_memory_usage, 
hash_table_memory_usage);
                        }},
                _agg_data->method_variant);
 }
@@ -419,11 +412,10 @@ Status 
AggSinkLocalState::_merge_without_key(vectorized::Block* block) {
 }
 
 void AggSinkLocalState::_update_memusage_without_key() {
-    auto arena_memory_usage =
-            _agg_arena_pool->size() - 
Base::_shared_state->mem_usage_record.used_in_arena;
-    Base::_mem_tracker->consume(arena_memory_usage);
-    _serialize_key_arena_memory_usage->add(arena_memory_usage);
-    Base::_shared_state->mem_usage_record.used_in_arena = 
_agg_arena_pool->size();
+    int64_t arena_memory_usage = _agg_arena_pool->size();
+    COUNTER_SET(_memory_used_counter, arena_memory_usage);
+    COUNTER_SET(_peak_memory_usage_counter, arena_memory_usage);
+    COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
 }
 
 Status AggSinkLocalState::_execute_with_serialized_key(vectorized::Block* 
block) {
@@ -875,8 +867,6 @@ Status AggSinkLocalState::close(RuntimeState* state, Status 
exec_status) {
 
     std::vector<char> tmp_deserialize_buffer;
     _deserialize_buffer.swap(tmp_deserialize_buffer);
-    
Base::_mem_tracker->release(Base::_shared_state->mem_usage_record.used_in_state 
+
-                                
Base::_shared_state->mem_usage_record.used_in_arena);
     return Base::close(state, exec_status);
 }
 
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 975b04477f2..e5209425f76 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -106,7 +106,7 @@ protected:
     RuntimeProfile::Counter* _deserialize_data_timer = nullptr;
     RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
     RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
-    RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = 
nullptr;
+    RuntimeProfile::Counter* _serialize_key_arena_memory_usage = nullptr;
 
     bool _should_limit_output = false;
 
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index a406bdc329e..fbc71ac06ad 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -450,8 +450,6 @@ void AggLocalState::do_agg_limit(vectorized::Block* block, 
bool* eos) {
             vectorized::Block::filter_block_internal(block, 
_shared_state->need_computes);
             if (auto rows = block->rows()) {
                 _num_rows_returned += rows;
-                COUNTER_UPDATE(_blocks_returned_counter, 1);
-                COUNTER_SET(_rows_returned_counter, _num_rows_returned);
             }
         } else {
             reached_limit(block, eos);
@@ -459,8 +457,6 @@ void AggLocalState::do_agg_limit(vectorized::Block* block, 
bool* eos) {
     } else {
         if (auto rows = block->rows()) {
             _num_rows_returned += rows;
-            COUNTER_UPDATE(_blocks_returned_counter, 1);
-            COUNTER_SET(_rows_returned_counter, _num_rows_returned);
         }
     }
 }
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp 
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 839a485f2d9..7c59c954368 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -33,6 +33,7 @@ Status AnalyticSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
     _compute_agg_data_timer = ADD_TIMER(profile(), "ComputeAggDataTime");
     _compute_partition_by_timer = ADD_TIMER(profile(), 
"ComputePartitionByTime");
     _compute_order_by_timer = ADD_TIMER(profile(), "ComputeOrderByTime");
+    _blocks_memory_usage = ADD_COUNTER_WITH_LEVEL(_profile, 
"MemoryUsageBlocks", TUnit::BYTES, 1);
     return Status::OK();
 }
 
@@ -322,8 +323,10 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Block
         }
     }
 
-    COUNTER_UPDATE(local_state._memory_used_counter, 
input_block->allocated_bytes());
+    int64_t block_mem_usage = input_block->allocated_bytes();
+    COUNTER_UPDATE(local_state._memory_used_counter, block_mem_usage);
     COUNTER_SET(local_state._peak_memory_usage_counter, 
local_state._memory_used_counter->value());
+    COUNTER_UPDATE(local_state._blocks_memory_usage, block_mem_usage);
 
     //TODO: if need improvement, the is a tips to maintain a free queue,
     //so the memory could reuse, no need to new/delete again;
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index 084998d2c36..5b08f0a6e3d 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -61,6 +61,7 @@ private:
     RuntimeProfile::Counter* _compute_agg_data_timer = nullptr;
     RuntimeProfile::Counter* _compute_partition_by_timer = nullptr;
     RuntimeProfile::Counter* _compute_order_by_timer = nullptr;
+    RuntimeProfile::Counter* _blocks_memory_usage = nullptr;
 
     std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs;
 };
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp 
b/be/src/pipeline/exec/analytic_source_operator.cpp
index 06a6374bbae..205e6e7d3e2 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -444,7 +444,6 @@ bool AnalyticLocalState::init_next_partition(BlockRowPos 
found_partition_end) {
 Status AnalyticLocalState::output_current_block(vectorized::Block* block) {
     block->swap(std::move(_shared_state->input_blocks[_output_block_index]));
     _blocks_memory_usage->add(-block->allocated_bytes());
-    mem_tracker()->consume(-block->allocated_bytes());
     if (_shared_state->origin_cols.size() < block->columns()) {
         block->erase_not_in(_shared_state->origin_cols);
     }
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp 
b/be/src/pipeline/exec/assert_num_rows_operator.cpp
index 563c4bf49ca..f4ad31a6f90 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.cpp
+++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp
@@ -115,8 +115,6 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
         return Status::Cancelled("Expected {} {} to be returned by expression 
{}",
                                  to_string_lambda(_assertion), 
_desired_num_rows, _subquery_string);
     }
-    COUNTER_SET(local_state.rows_returned_counter(), 
local_state.num_rows_returned());
-    COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
     
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
block,
                                                            block->columns()));
     return Status::OK();
diff --git a/be/src/pipeline/exec/cache_source_operator.cpp 
b/be/src/pipeline/exec/cache_source_operator.cpp
index e98a18b76a3..1387de351d1 100644
--- a/be/src/pipeline/exec/cache_source_operator.cpp
+++ b/be/src/pipeline/exec/cache_source_operator.cpp
@@ -159,7 +159,6 @@ Status CacheSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* b
             local_state._current_query_cache_rows += output_block->rows();
             auto mem_consume = output_block->allocated_bytes();
             local_state._current_query_cache_bytes += mem_consume;
-            local_state._mem_tracker->consume(mem_consume);
 
             if (_cache_param.entry_max_bytes < 
local_state._current_query_cache_bytes ||
                 _cache_param.entry_max_rows < 
local_state._current_query_cache_rows) {
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 75b26c3ed18..2f4a046003f 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -461,7 +461,6 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* 
state, vectorized::Bloc
                                                                
block->columns()));
     }
     local_state.add_num_rows_returned(block->rows());
-    COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
     // If the limit is not reached, it is important to ensure that 
_aggregated_block is empty
     // because it may still contain data.
     // However, if the limit is reached, there is no need to output data even 
if some exists.
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 1753991fe52..45aeda8d9d3 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -156,6 +156,9 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& 
request) {
         if (request.block) {
             RETURN_IF_ERROR(
                     
BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
+            COUNTER_UPDATE(_parent->memory_used_counter(), 
request.block->ByteSizeLong());
+            COUNTER_SET(_parent->peak_memory_usage_counter(),
+                        _parent->memory_used_counter()->value());
         }
         _instance_to_package_queue[ins_id].emplace(std::move(request));
         _total_queue_size++;
@@ -291,6 +294,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         }
         if (request.block) {
             static_cast<void>(brpc_request->release_block());
+            COUNTER_UPDATE(_parent->memory_used_counter(), 
-request.block->ByteSizeLong());
         }
         q.pop();
         _total_queue_size--;
@@ -416,12 +420,24 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId 
id) {
     _turn_off_channel(id, lock);
     std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& 
broadcast_q =
             _instance_to_broadcast_package_queue[id];
+    for (; !broadcast_q.empty(); broadcast_q.pop()) {
+        if (broadcast_q.front().block_holder->get_block()) {
+            COUNTER_UPDATE(_parent->memory_used_counter(),
+                           
-broadcast_q.front().block_holder->get_block()->ByteSizeLong());
+        }
+    }
     {
         std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> 
empty;
         swap(empty, broadcast_q);
     }
 
     std::queue<TransmitInfo, std::list<TransmitInfo>>& q = 
_instance_to_package_queue[id];
+    for (; !q.empty(); q.pop()) {
+        if (q.front().block) {
+            COUNTER_UPDATE(_parent->memory_used_counter(), 
-q.front().block->ByteSizeLong());
+        }
+    }
+
     {
         std::queue<TransmitInfo, std::list<TransmitInfo>> empty;
         swap(empty, q);
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 85c87df8f4d..1561c105298 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -91,7 +91,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
             
channels.emplace_back(channels[fragment_id_to_channel_index[fragment_instance_id.lo]]);
         }
     }
-    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
     // Make sure brpc stub is ready before execution.
     for (int i = 0; i < channels.size(); ++i) {
@@ -126,7 +125,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
     SCOPED_TIMER(_open_timer);
     RETURN_IF_ERROR(Base::open(state));
     auto& p = _parent->cast<ExchangeSinkOperatorX>();
-    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
     if (_part_type == TPartitionType::UNPARTITIONED || _part_type == 
TPartitionType::RANDOM ||
         _part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
@@ -348,7 +346,6 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) {
 Status ExchangeSinkOperatorX::open(RuntimeState* state) {
     RETURN_IF_ERROR(DataSinkOperatorX<ExchangeSinkLocalState>::open(state));
     _state = state;
-    _mem_tracker = std::make_unique<MemTracker>("ExchangeSinkOperatorX:");
     _compression_type = state->fragement_transmission_compression_type();
     if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
         if (_output_tuple_id == -1) {
@@ -377,7 +374,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
     auto& local_state = get_local_state(state);
     COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
     SCOPED_TIMER(local_state.exec_time_counter());
-    
local_state._peak_memory_usage_counter->set(local_state._mem_tracker->peak_consumption());
     bool all_receiver_eof = true;
     for (auto& channel : local_state.channels) {
         if (!channel->is_receiver_eof()) {
@@ -389,6 +385,10 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         return Status::EndOfFile("all data stream channels EOF");
     }
 
+    Defer defer([&]() {
+        COUNTER_SET(local_state._peak_memory_usage_counter,
+                    local_state._memory_used_counter->value());
+    });
     if (_part_type == TPartitionType::UNPARTITIONED || 
local_state.channels.size() == 1) {
         // 1. serialize depends on it is not local exchange
         // 2. send block
@@ -412,7 +412,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         } else {
             auto block_holder = 
vectorized::BroadcastPBlockHolder::create_shared();
             {
-                SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
                 bool serialized = false;
                 RETURN_IF_ERROR(local_state._serializer.next_serialized_block(
                         block, block_holder->get_block(), 
local_state._rpc_channels_num,
@@ -444,7 +443,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
                                         idx == 
local_state._last_local_channel_idx);
                                 moved = idx == 
local_state._last_local_channel_idx;
                             } else {
-                                SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
                                 status = 
channel->send_broadcast_block(block_holder, eos);
                             }
                             HANDLE_CHANNEL_STATUS(state, channel, status);
@@ -470,7 +468,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
                 auto status = current_channel->send_local_block(block, eos, 
true);
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
             } else {
-                SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
                 auto pblock = std::make_unique<PBlock>();
                 RETURN_IF_ERROR(local_state._serializer.serialize_block(block, 
pblock.get()));
                 auto status = 
current_channel->send_remote_block(std::move(pblock), eos);
@@ -484,8 +481,11 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         auto rows = block->rows();
         {
             SCOPED_TIMER(local_state._split_block_hash_compute_timer);
-            RETURN_IF_ERROR(
-                    local_state._partitioner->do_partitioning(state, block, 
_mem_tracker.get()));
+            RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, 
block));
+        }
+        int64_t old_channel_mem_usage = 0;
+        for (const auto& channel : local_state.channels) {
+            old_channel_mem_usage += channel->mem_usage();
         }
         if (_part_type == TPartitionType::HASH_PARTITIONED) {
             SCOPED_TIMER(local_state._distribute_rows_into_channels_timer);
@@ -498,6 +498,14 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
                     state, local_state.channels, local_state._partition_count,
                     
local_state._partitioner->get_channel_ids().get<uint32_t>(), rows, block, eos));
         }
+        int64_t new_channel_mem_usage = 0;
+        for (const auto& channel : local_state.channels) {
+            new_channel_mem_usage += channel->mem_usage();
+        }
+        COUNTER_UPDATE(local_state.memory_used_counter(),
+                       new_channel_mem_usage - old_channel_mem_usage);
+        COUNTER_SET(local_state.peak_memory_usage_counter(),
+                    local_state.memory_used_counter()->value());
     } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
         int64_t old_channel_mem_usage = 0;
         for (const auto& channel : local_state.channels) {
@@ -549,10 +557,13 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         COUNTER_SET(local_state.peak_memory_usage_counter(),
                     local_state.memory_used_counter()->value());
     } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
+        int64_t old_channel_mem_usage = 0;
+        for (const auto& channel : local_state.channels) {
+            old_channel_mem_usage += channel->mem_usage();
+        }
         {
             SCOPED_TIMER(local_state._split_block_hash_compute_timer);
-            RETURN_IF_ERROR(
-                    local_state._partitioner->do_partitioning(state, block, 
_mem_tracker.get()));
+            RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, 
block));
         }
         std::vector<std::vector<uint32>> assignments =
                 local_state.scale_writer_partitioning_exchanger->accept(block);
@@ -563,6 +574,14 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
                                                       block, eos));
         }
 
+        int64_t new_channel_mem_usage = 0;
+        for (const auto& channel : local_state.channels) {
+            new_channel_mem_usage += channel->mem_usage();
+        }
+        COUNTER_UPDATE(local_state.memory_used_counter(),
+                       new_channel_mem_usage - old_channel_mem_usage);
+        COUNTER_SET(local_state.peak_memory_usage_counter(),
+                    local_state.memory_used_counter()->value());
     } else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
         // Control the number of channels according to the flow, thereby 
controlling the number of table sink writers.
         // 1. select channel
@@ -573,7 +592,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
                 auto status = current_channel->send_local_block(block, eos, 
true);
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
             } else {
-                SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
                 auto pblock = std::make_unique<PBlock>();
                 RETURN_IF_ERROR(local_state._serializer.serialize_block(block, 
pblock.get()));
                 auto status = 
current_channel->send_remote_block(std::move(pblock), eos);
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index bee34ad1a85..e85deb1a679 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -240,7 +240,6 @@ private:
 
     const std::vector<TPlanFragmentDestination> _dests;
 
-    std::unique_ptr<MemTracker> _mem_tracker;
     // Identifier of the destination plan node.
     const PlanNodeId _dest_node_id;
 
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index deafb361c57..572e848d569 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -63,8 +63,8 @@ Status ExchangeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     SCOPED_TIMER(_init_timer);
     auto& p = _parent->cast<ExchangeSourceOperatorX>();
     stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
-            state, p.input_row_desc(), state->fragment_instance_id(), 
p.node_id(), p.num_senders(),
-            profile(), p.is_merging());
+            state, this, p.input_row_desc(), state->fragment_instance_id(), 
p.node_id(),
+            p.num_senders(), profile(), p.is_merging());
     const auto& queues = stream_recvr->sender_queues();
     deps.resize(queues.size());
     metrics.resize(queues.size());
@@ -186,8 +186,6 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* 
state, vectorized::Block
             block->set_num_rows(limit);
             local_state.set_num_rows_returned(_limit);
         }
-        COUNTER_SET(local_state.rows_returned_counter(), 
local_state.num_rows_returned());
-        COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
     }
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp 
b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
index 8da335f4fa2..9b059831842 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
@@ -46,8 +46,6 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState* 
state) {
     _vpartition = std::make_unique<doris::VOlapTablePartitionParam>(p._schema, 
p._partition);
     RETURN_IF_ERROR(_vpartition->init());
     _state = state;
-    // profile must add to state's object pool
-    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
     _block_convertor = 
std::make_unique<vectorized::OlapTableBlockConvertor>(p._output_tuple_desc);
     _block_convertor->init_autoinc_info(p._schema->db_id(), 
p._schema->table_id(),
@@ -288,7 +286,6 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* 
state, vectorized::Bloc
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)input_block->rows());
-    SCOPED_CONSUME_MEM_TRACKER(local_state._mem_tracker.get());
     if (!local_state._load_block_queue) {
         RETURN_IF_ERROR(local_state._initialize_load_queue());
     }
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 4fa9f9a95a6..46272ea1328 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -74,11 +74,11 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
 
     _runtime_filter_init_timer = ADD_TIMER(profile(), "RuntimeFilterInitTime");
     _build_blocks_memory_usage =
-            ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "BuildBlocks", 
TUnit::BYTES, "MemoryUsage", 1);
+            ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageBuildBlocks", 
TUnit::BYTES, 1);
     _hash_table_memory_usage =
-            ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "HashTable", TUnit::BYTES, 
"MemoryUsage", 1);
+            ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageHashTable", 
TUnit::BYTES, 1);
     _build_arena_memory_usage =
-            profile()->AddHighWaterMarkCounter("BuildKeyArena", TUnit::BYTES, 
"MemoryUsage", 1);
+            ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageBuildKeyArena", 
TUnit::BYTES, 1);
 
     // Build phase
     auto* record_profile = _should_build_hash_table ? profile() : 
faker_runtime_profile();
@@ -302,41 +302,41 @@ Status 
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
     // Get the key column that needs to be built
     Status st = _extract_join_column(block, null_map_val, raw_ptrs, 
_build_col_ids);
 
-    st = std::visit(
-            vectorized::Overload {
-                    [&](std::monostate& arg, auto join_op, auto has_null_value,
-                        auto short_circuit_for_null_in_build_side,
-                        auto with_other_conjuncts) -> Status {
-                        LOG(FATAL) << "FATAL: uninited hash table";
-                        __builtin_unreachable();
-                        return Status::OK();
-                    },
-                    [&](auto&& arg, auto&& join_op, auto has_null_value,
-                        auto short_circuit_for_null_in_build_side,
-                        auto with_other_conjuncts) -> Status {
-                        using HashTableCtxType = std::decay_t<decltype(arg)>;
-                        using JoinOpType = std::decay_t<decltype(join_op)>;
-                        ProcessHashTableBuild<HashTableCtxType> 
hash_table_build_process(
-                                rows, raw_ptrs, this, state->batch_size(), 
state);
-                        auto old_hash_table_size = 
arg.hash_table->get_byte_size();
-                        auto old_key_size = arg.serialized_keys_size(true);
-                        auto st = hash_table_build_process.template run<
-                                JoinOpType::value, has_null_value,
-                                short_circuit_for_null_in_build_side, 
with_other_conjuncts>(
-                                arg,
-                                has_null_value || 
short_circuit_for_null_in_build_side
-                                        ? &null_map_val->get_data()
-                                        : nullptr,
-                                &_shared_state->_has_null_in_build_side);
-                        _mem_tracker->consume(arg.hash_table->get_byte_size() -
-                                              old_hash_table_size);
-                        _mem_tracker->consume(arg.serialized_keys_size(true) - 
old_key_size);
-                        return st;
-                    }},
-            *_shared_state->hash_table_variants, 
_shared_state->join_op_variants,
-            vectorized::make_bool_variant(_build_side_ignore_null),
-            
vectorized::make_bool_variant(p._short_circuit_for_null_in_build_side),
-            vectorized::make_bool_variant((p._have_other_join_conjunct)));
+    st = std::visit(vectorized::Overload {
+                            [&](std::monostate& arg, auto join_op, auto 
has_null_value,
+                                auto short_circuit_for_null_in_build_side,
+                                auto with_other_conjuncts) -> Status {
+                                LOG(FATAL) << "FATAL: uninited hash table";
+                                __builtin_unreachable();
+                                return Status::OK();
+                            },
+                            [&](auto&& arg, auto&& join_op, auto 
has_null_value,
+                                auto short_circuit_for_null_in_build_side,
+                                auto with_other_conjuncts) -> Status {
+                                using HashTableCtxType = 
std::decay_t<decltype(arg)>;
+                                using JoinOpType = 
std::decay_t<decltype(join_op)>;
+                                ProcessHashTableBuild<HashTableCtxType> 
hash_table_build_process(
+                                        rows, raw_ptrs, this, 
state->batch_size(), state);
+                                auto st = hash_table_build_process.template 
run<
+                                        JoinOpType::value, has_null_value,
+                                        short_circuit_for_null_in_build_side, 
with_other_conjuncts>(
+                                        arg,
+                                        has_null_value || 
short_circuit_for_null_in_build_side
+                                                ? &null_map_val->get_data()
+                                                : nullptr,
+                                        
&_shared_state->_has_null_in_build_side);
+                                COUNTER_SET(_memory_used_counter,
+                                            
_build_blocks_memory_usage->value() +
+                                                    
(int64_t)(arg.hash_table->get_byte_size() +
+                                                              
arg.serialized_keys_size(true)));
+                                COUNTER_SET(_peak_memory_usage_counter,
+                                            _memory_used_counter->value());
+                                return st;
+                            }},
+                    *_shared_state->hash_table_variants, 
_shared_state->join_op_variants,
+                    vectorized::make_bool_variant(_build_side_ignore_null),
+                    
vectorized::make_bool_variant(p._short_circuit_for_null_in_build_side),
+                    
vectorized::make_bool_variant((p._have_other_join_conjunct)));
 
     return st;
 }
@@ -532,7 +532,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     if (local_state._should_build_hash_table) {
         // If eos or have already met a null value using short-circuit 
strategy, we do not need to pull
         // data from probe side.
-        local_state._build_side_mem_used += in_block->allocated_bytes();
 
         if (local_state._build_side_mutable_block.empty()) {
             auto tmp_build_block = 
vectorized::VectorizedUtils::create_empty_columnswithtypename(
@@ -559,12 +558,13 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                         std::to_string(std::numeric_limits<uint32_t>::max()));
             }
 
-            local_state._mem_tracker->consume(in_block->bytes());
-            COUNTER_UPDATE(local_state._build_blocks_memory_usage, 
in_block->bytes());
-
             SCOPED_TIMER(local_state._build_side_merge_block_timer);
             
RETURN_IF_ERROR(local_state._build_side_mutable_block.merge_ignore_overflow(
                     std::move(*in_block)));
+            int64_t blocks_mem_usage = 
local_state._build_side_mutable_block.allocated_bytes();
+            COUNTER_SET(local_state._memory_used_counter, blocks_mem_usage);
+            COUNTER_SET(local_state._peak_memory_usage_counter, 
blocks_mem_usage);
+            COUNTER_SET(local_state._build_blocks_memory_usage, 
blocks_mem_usage);
         }
     }
 
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 4833bee5488..f8634ac4c49 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -74,8 +74,6 @@ protected:
     std::vector<vectorized::ColumnPtr> _key_columns_holder;
 
     bool _should_build_hash_table = true;
-    int64_t _build_side_mem_used = 0;
-    int64_t _build_side_last_mem_used = 0;
 
     size_t _build_side_rows = 0;
 
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 4f9184db8a5..e3afa65ae46 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -54,7 +54,7 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, 
LocalStateInfo& info)
     _construct_mutable_join_block();
     _probe_column_disguise_null.reserve(_probe_expr_ctxs.size());
     _probe_arena_memory_usage =
-            profile()->AddHighWaterMarkCounter("ProbeKeyArena", TUnit::BYTES, 
"MemoryUsage", 1);
+            profile()->AddHighWaterMarkCounter("MemoryUsageProbeKeyArena", 
TUnit::BYTES, "", 1);
     // Probe phase
     _probe_expr_call_timer = ADD_TIMER(profile(), "ProbeExprCallTime");
     _search_hashtable_timer = ADD_TIMER(profile(), 
"ProbeWhenSearchHashTableTime");
@@ -301,8 +301,6 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
                                                  mutable_join_block, 
&temp_block,
                                                  
local_state._probe_block.rows(), _is_mark_join,
                                                  _have_other_join_conjunct);
-                            local_state._mem_tracker->set_consumption(
-                                    arg.serialized_keys_size(false));
                         } else {
                             st = Status::InternalError("uninited hash table");
                         }
@@ -498,6 +496,10 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state, 
vectorized::Block* inpu
 
         if (&local_state._probe_block != input_block) {
             input_block->swap(local_state._probe_block);
+            COUNTER_SET(local_state._memory_used_counter,
+                        (int64_t)local_state._probe_block.allocated_bytes());
+            COUNTER_SET(local_state._peak_memory_usage_counter,
+                        local_state._memory_used_counter->value());
         }
     }
     return Status::OK();
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe.h 
b/be/src/pipeline/exec/join/process_hash_table_probe.h
index 2ccc9aec8c7..7a5c34fb845 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe.h
@@ -131,7 +131,6 @@ struct ProcessHashTableProbe {
     bool _need_calculate_build_index_has_zero = true;
     bool* _has_null_in_build_side;
 
-    RuntimeProfile::Counter* _rows_returned_counter = nullptr;
     RuntimeProfile::Counter* _search_hashtable_timer = nullptr;
     RuntimeProfile::Counter* _init_probe_side_timer = nullptr;
     RuntimeProfile::Counter* _build_side_output_timer = nullptr;
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h 
b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
index 5de033b63e8..8123badb726 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
@@ -51,7 +51,6 @@ 
ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinProbeLocalState
           _left_output_slot_flags(parent->left_output_slot_flags()),
           _right_output_slot_flags(parent->right_output_slot_flags()),
           _has_null_in_build_side(parent->has_null_in_build_side()),
-          _rows_returned_counter(parent->_rows_returned_counter),
           _search_hashtable_timer(parent->_search_hashtable_timer),
           _init_probe_side_timer(parent->_init_probe_side_timer),
           _build_side_output_timer(parent->_build_side_output_timer),
@@ -177,8 +176,10 @@ typename HashTableType::State 
ProcessHashTableProbe<JoinOpType>::_init_probe_sid
                                             false, 
hash_table_ctx.hash_table->get_bucket_size());
         hash_table_ctx.hash_table->pre_build_idxs(hash_table_ctx.bucket_nums,
                                                   need_judge_null ? null_map : 
nullptr);
-        COUNTER_SET(_parent->_probe_arena_memory_usage,
-                    (int64_t)hash_table_ctx.serialized_keys_size(false));
+        int64_t arena_memory_usage = 
hash_table_ctx.serialized_keys_size(false);
+        COUNTER_SET(_parent->_probe_arena_memory_usage, arena_memory_usage);
+        COUNTER_UPDATE(_parent->_memory_used_counter, arena_memory_usage);
+        COUNTER_SET(_parent->_peak_memory_usage_counter, 
_parent->_memory_used_counter->value());
     }
 
     return typename HashTableType::State(_parent->_probe_columns);
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 304e8e96f0c..c91dccb873f 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -106,7 +106,6 @@ Status 
MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
                 local_state._output_expr_contexts, *output_block, block, 
true));
         vectorized::materialize_block_inplace(*block);
     }
-    COUNTER_UPDATE(local_state._rows_returned_counter, block->rows());
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index 79f3cbdf17b..3b5174d87c0 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -322,17 +322,28 @@ Status 
OperatorXBase::get_block_after_projects(RuntimeState* state, vectorized::
         }
     });
 
+    Status status;
     auto* local_state = state->get_local_state(operator_id());
+    Defer defer([&]() {
+        if (status.ok()) {
+            if (auto rows = block->rows()) {
+                COUNTER_UPDATE(local_state->_rows_returned_counter, rows);
+                COUNTER_UPDATE(local_state->_blocks_returned_counter, 1);
+            }
+        }
+    });
     if (_output_row_descriptor) {
         local_state->clear_origin_block();
-        auto status = get_block(state, &local_state->_origin_block, eos);
+        status = get_block(state, &local_state->_origin_block, eos);
         if (UNLIKELY(!status.ok())) {
             return status;
         }
-        return do_projections(state, &local_state->_origin_block, block);
+        status = do_projections(state, &local_state->_origin_block, block);
+        return status;
     }
-    
local_state->_peak_memory_usage_counter->set(local_state->_mem_tracker->peak_consumption());
-    return get_block(state, block, eos);
+    status = get_block(state, block, eos);
+    
local_state->_peak_memory_usage_counter->set(local_state->_memory_used_counter->value());
+    return status;
 }
 
 void PipelineXLocalStateBase::reached_limit(vectorized::Block* block, bool* 
eos) {
@@ -353,8 +364,6 @@ void 
PipelineXLocalStateBase::reached_limit(vectorized::Block* block, bool* eos)
 
     if (auto rows = block->rows()) {
         _num_rows_returned += rows;
-        COUNTER_UPDATE(_blocks_returned_counter, 1);
-        COUNTER_SET(_rows_returned_counter, _num_rows_returned);
     }
 }
 
@@ -475,10 +484,9 @@ Status 
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
     _open_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "OpenTime", 1);
     _close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1);
     _exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1);
-    _mem_tracker = std::make_unique<MemTracker>("PipelineXLocalState:" + 
_runtime_profile->name());
-    _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_runtime_profile, 
"MemoryUsage", 1);
-    _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter(
-            "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1);
+    _memory_used_counter = ADD_COUNTER_WITH_LEVEL(_runtime_profile, 
"MemoryUsage", TUnit::BYTES, 1);
+    _peak_memory_usage_counter =
+            _runtime_profile->AddHighWaterMarkCounter("MemoryUsagePeak", 
TUnit::BYTES, "", 1);
     return Status::OK();
 }
 
@@ -511,11 +519,8 @@ Status 
PipelineXLocalState<SharedStateArg>::close(RuntimeState* state) {
     if constexpr (!std::is_same_v<SharedStateArg, FakeSharedState>) {
         COUNTER_SET(_wait_for_dependency_timer, 
_dependency->watcher_elapse_time());
     }
-    if (_rows_returned_counter != nullptr) {
-        COUNTER_SET(_rows_returned_counter, _num_rows_returned);
-    }
     if (_peak_memory_usage_counter) {
-        _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
+        _peak_memory_usage_counter->set(_memory_used_counter->value());
     }
     _closed = true;
     // Some kinds of source operators has a 1-1 relationship with a sink 
operator (such as AnalyticOperator).
@@ -555,10 +560,9 @@ Status 
PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
     _close_timer = ADD_TIMER_WITH_LEVEL(_profile, "CloseTime", 1);
     _exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1);
     info.parent_profile->add_child(_profile, true, nullptr);
-    _mem_tracker = std::make_unique<MemTracker>(_parent->get_name());
-    _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_profile, 
"MemoryUsage", 1);
+    _memory_used_counter = ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsage", 
TUnit::BYTES, 1);
     _peak_memory_usage_counter =
-            _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, 
"MemoryUsage", 1);
+            _profile->AddHighWaterMarkCounter("MemoryUsagePeak", TUnit::BYTES, 
"", 1);
     return Status::OK();
 }
 
@@ -571,7 +575,7 @@ Status 
PipelineXSinkLocalState<SharedState>::close(RuntimeState* state, Status e
         COUNTER_SET(_wait_for_dependency_timer, 
_dependency->watcher_elapse_time());
     }
     if (_peak_memory_usage_counter) {
-        _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
+        _peak_memory_usage_counter->set(_memory_used_counter->value());
     }
     _closed = true;
     return Status::OK();
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 301f7599737..25c3a68a9f2 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -163,10 +163,9 @@ public:
     void reached_limit(vectorized::Block* block, bool* eos);
     RuntimeProfile* profile() { return _runtime_profile.get(); }
 
-    MemTracker* mem_tracker() { return _mem_tracker.get(); }
-    RuntimeProfile::Counter* rows_returned_counter() { return 
_rows_returned_counter; }
-    RuntimeProfile::Counter* blocks_returned_counter() { return 
_blocks_returned_counter; }
     RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; }
+    RuntimeProfile::Counter* memory_used_counter() { return 
_memory_used_counter; }
+    RuntimeProfile::Counter* peak_memory_usage_counter() { return 
_peak_memory_usage_counter; }
     OperatorXBase* parent() { return _parent; }
     RuntimeState* state() { return _state; }
     vectorized::VExprContextSPtrs& conjuncts() { return _conjuncts; }
@@ -188,16 +187,14 @@ public:
 
 protected:
     friend class OperatorXBase;
+    template <typename LocalStateType>
+    friend class ScanOperatorX;
 
     ObjectPool* _pool = nullptr;
     int64_t _num_rows_returned {0};
 
     std::unique_ptr<RuntimeProfile> _runtime_profile;
 
-    // Record this node memory size. it is expected that artificial guarantees 
are accurate,
-    // which will providea reference for operator memory.
-    std::unique_ptr<MemTracker> _mem_tracker;
-
     std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
 
     RuntimeProfile::Counter* _rows_returned_counter = nullptr;
@@ -342,7 +339,6 @@ public:
     DataSinkOperatorXBase* parent() { return _parent; }
     RuntimeState* state() { return _state; }
     RuntimeProfile* profile() { return _profile; }
-    MemTracker* mem_tracker() { return _mem_tracker.get(); }
     [[nodiscard]] RuntimeProfile* faker_runtime_profile() const {
         return _faker_runtime_profile.get();
     }
@@ -350,9 +346,7 @@ public:
     RuntimeProfile::Counter* rows_input_counter() { return 
_rows_input_counter; }
     RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; }
     RuntimeProfile::Counter* memory_used_counter() { return 
_memory_used_counter; }
-    RuntimeProfile::HighWaterMarkCounter* peak_memory_usage_counter() {
-        return _peak_memory_usage_counter;
-    }
+    RuntimeProfile::Counter* peak_memory_usage_counter() { return 
_peak_memory_usage_counter; }
     virtual std::vector<Dependency*> dependencies() const { return {nullptr}; }
 
     // override in exchange sink , AsyncWriterSink
@@ -364,7 +358,6 @@ protected:
     DataSinkOperatorXBase* _parent = nullptr;
     RuntimeState* _state = nullptr;
     RuntimeProfile* _profile = nullptr;
-    std::unique_ptr<MemTracker> _mem_tracker;
     // Set to true after close() has been called. subclasses should check and 
set this in
     // close().
     bool _closed = false;
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp 
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index fbabdbdc8f8..eba9e8472d3 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -113,9 +113,9 @@ Status PartitionSortSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
     _partition_exprs_num = p._partition_exprs_num;
     _hash_table_size_counter = ADD_COUNTER(_profile, "HashTableSize", 
TUnit::UNIT);
     _serialize_key_arena_memory_usage =
-            _profile->AddHighWaterMarkCounter("SerializeKeyArena", 
TUnit::BYTES, "MemoryUsage", 1);
+            _profile->AddHighWaterMarkCounter("MemoryUsageSerializeKeyArena", 
TUnit::BYTES, "", 1);
     _hash_table_memory_usage =
-            ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "HashTable", TUnit::BYTES, 
"MemoryUsage", 1);
+            ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageHashTable", 
TUnit::BYTES, 1);
     _build_timer = ADD_TIMER(_profile, "HashTableBuildTime");
     _selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime");
     _emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime");
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp 
b/be/src/pipeline/exec/partition_sort_source_operator.cpp
index f2cd8dea0b9..6d355477ab8 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp
@@ -58,7 +58,6 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* 
state, vectorized::
                 }
             }
             if (!output_block->empty()) {
-                COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
                 local_state._num_rows_returned += output_block->rows();
             }
             return Status::OK();
@@ -80,7 +79,6 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* 
state, vectorized::
                local_state._sort_idx >= 
local_state._shared_state->partition_sorts.size();
     }
     if (!output_block->empty()) {
-        COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
         local_state._num_rows_returned += output_block->rows();
     }
     return Status::OK();
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 469716b7a22..ab0a43f4a63 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -81,10 +81,10 @@ Status PartitionedAggSinkLocalState::close(RuntimeState* 
state, Status exec_stat
 void PartitionedAggSinkLocalState::_init_counters() {
     _internal_runtime_profile = 
std::make_unique<RuntimeProfile>("internal_profile");
 
-    _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), 
"HashTable",
-                                                            TUnit::BYTES, 
"MemoryUsage", 1);
+    _hash_table_memory_usage =
+            ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageHashTable", 
TUnit::BYTES, 1);
     _serialize_key_arena_memory_usage = 
Base::profile()->AddHighWaterMarkCounter(
-            "SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1);
+            "MemoryUsageSerializeKeyArena", TUnit::BYTES, "", 1);
 
     _build_timer = ADD_TIMER(Base::profile(), "BuildTime");
     _serialize_key_timer = ADD_TIMER(Base::profile(), "SerializeKeyTime");
@@ -110,8 +110,8 @@ void PartitionedAggSinkLocalState::_init_counters() {
     } while (false)
 
 void PartitionedAggSinkLocalState::update_profile(RuntimeProfile* 
child_profile) {
-    UPDATE_PROFILE(_hash_table_memory_usage, "HashTable");
-    UPDATE_PROFILE(_serialize_key_arena_memory_usage, "SerializeKeyArena");
+    UPDATE_PROFILE(_hash_table_memory_usage, "MemoryUsageHashTable");
+    UPDATE_PROFILE(_serialize_key_arena_memory_usage, 
"MemoryUsageSerializeKeyArena");
     UPDATE_PROFILE(_build_timer, "BuildTime");
     UPDATE_PROFILE(_serialize_key_timer, "SerializeKeyTime");
     UPDATE_PROFILE(_merge_timer, "MergeTime");
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 018d63a6dee..0e56acc1c57 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -557,8 +557,7 @@ Status 
PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
     }
     {
         SCOPED_TIMER(local_state._partition_timer);
-        RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, 
input_block,
-                                                                  
local_state._mem_tracker.get()));
+        RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, 
input_block));
     }
 
     std::vector<std::vector<uint32_t>> partition_indexes(_partition_count);
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 32af13ba548..e3ca74c2d53 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -80,7 +80,7 @@ size_t 
PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state
             if (inner_sink_state_) {
                 auto inner_sink_state =
                         
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
-                return inner_sink_state->_build_side_mem_used;
+                return inner_sink_state->_build_blocks_memory_usage->value();
             }
         }
         return 0;
@@ -161,7 +161,7 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
 
             {
                 SCOPED_TIMER(_partition_timer);
-                (void)_partitioner->do_partitioning(state, &sub_block, 
_mem_tracker.get());
+                (void)_partitioner->do_partitioning(state, &sub_block);
             }
 
             const auto* channel_ids = 
_partitioner->get_channel_ids().get<uint32_t>();
@@ -334,7 +334,7 @@ Status 
PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,
     {
         /// TODO: DO NOT execute build exprs twice(when partition and building 
hash table)
         SCOPED_TIMER(_partition_timer);
-        RETURN_IF_ERROR(_partitioner->do_partitioning(state, in_block, 
_mem_tracker.get()));
+        RETURN_IF_ERROR(_partitioner->do_partitioning(state, in_block));
     }
 
     auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
diff --git a/be/src/pipeline/exec/repeat_operator.cpp 
b/be/src/pipeline/exec/repeat_operator.cpp
index cd707ccc49f..5c94d43f0d1 100644
--- a/be/src/pipeline/exec/repeat_operator.cpp
+++ b/be/src/pipeline/exec/repeat_operator.cpp
@@ -248,7 +248,6 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, 
vectorized::Block* outp
     }
     *eos = _child_eos && _child_block.rows() == 0;
     local_state.reached_limit(output_block, eos);
-    COUNTER_SET(local_state._rows_returned_counter, 
local_state._num_rows_returned);
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 7c774a5aaa0..5d41c800383 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -358,7 +358,15 @@ public:
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override;
     Status get_block_after_projects(RuntimeState* state, vectorized::Block* 
block,
                                     bool* eos) override {
-        return get_block(state, block, eos);
+        Status status = get_block(state, block, eos);
+        if (status.ok()) {
+            if (auto rows = block->rows()) {
+                auto* local_state = state->get_local_state(operator_id());
+                COUNTER_UPDATE(local_state->_rows_returned_counter, rows);
+                COUNTER_UPDATE(local_state->_blocks_returned_counter, 1);
+            }
+        }
+        return status;
     }
     [[nodiscard]] bool is_source() const override { return true; }
 
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp 
b/be/src/pipeline/exec/sort_sink_operator.cpp
index 6f67262ef1f..ad7d7297109 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -31,7 +31,7 @@ Status SortSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_init_timer);
     _sort_blocks_memory_usage =
-            ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "SortBlocks", TUnit::BYTES, 
"MemoryUsage", 1);
+            ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageSortBlocks", 
TUnit::BYTES, 1);
     _append_blocks_timer = ADD_TIMER(profile(), "AppendBlockTime");
     _update_runtime_predicate_timer = ADD_TIMER(profile(), 
"UpdateRuntimePredicateTime");
     return Status::OK();
@@ -121,12 +121,14 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Block* in
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
     if (in_block->rows() > 0) {
-        COUNTER_UPDATE(local_state._sort_blocks_memory_usage, 
(int64_t)in_block->bytes());
         {
             SCOPED_TIMER(local_state._append_blocks_timer);
             
RETURN_IF_ERROR(local_state._shared_state->sorter->append_block(in_block));
         }
-        
local_state._mem_tracker->set_consumption(local_state._shared_state->sorter->data_size());
+        int64_t data_size = local_state._shared_state->sorter->data_size();
+        COUNTER_SET(local_state._sort_blocks_memory_usage, data_size);
+        COUNTER_SET(local_state._memory_used_counter, data_size);
+        COUNTER_SET(local_state._peak_memory_usage_counter, data_size);
         RETURN_IF_CANCELLED(state);
 
         if (state->get_query_ctx()->has_runtime_predicate(_node_id)) {
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 4bf1ab04efb..130affd2f4b 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -51,7 +51,7 @@ void SpillSortSinkLocalState::_init_counters() {
     _partial_sort_timer = ADD_TIMER(_profile, "PartialSortTime");
     _merge_block_timer = ADD_TIMER(_profile, "MergeBlockTime");
     _sort_blocks_memory_usage =
-            ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "SortBlocks", TUnit::BYTES, 
"MemoryUsage", 1);
+            ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageSortBlocks", 
TUnit::BYTES, 1);
 
     _spill_merge_sort_timer =
             ADD_CHILD_TIMER_WITH_LEVEL(_profile, "SpillMergeSortTime", 
"Spill", 1);
@@ -70,7 +70,7 @@ void SpillSortSinkLocalState::_init_counters() {
 void SpillSortSinkLocalState::update_profile(RuntimeProfile* child_profile) {
     UPDATE_PROFILE(_partial_sort_timer, "PartialSortTime");
     UPDATE_PROFILE(_merge_block_timer, "MergeBlockTime");
-    UPDATE_PROFILE(_sort_blocks_memory_usage, "SortBlocks");
+    UPDATE_PROFILE(_sort_blocks_memory_usage, "MemoryUsageSortBlocks");
 }
 
 Status SpillSortSinkLocalState::close(RuntimeState* state, Status 
execsink_status) {
@@ -156,8 +156,11 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Bloc
     DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::sink",
                     { return Status::InternalError("fault_inject 
spill_sort_sink sink failed"); });
     
RETURN_IF_ERROR(_sort_sink_operator->sink(local_state._runtime_state.get(), 
in_block, false));
-    local_state._mem_tracker->set_consumption(
-            
local_state._shared_state->in_mem_shared_state->sorter->data_size());
+    int64_t data_size = 
local_state._shared_state->in_mem_shared_state->sorter->data_size();
+    COUNTER_SET(local_state._sort_blocks_memory_usage, data_size);
+    COUNTER_SET(local_state._memory_used_counter, data_size);
+    COUNTER_SET(local_state._peak_memory_usage_counter, data_size);
+
     if (eos) {
         if (local_state._shared_state->is_spilled) {
             if (revocable_mem_size(state) > 0) {
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 9ead9c37b17..ab969a129e6 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -87,10 +87,10 @@ Status StreamingAggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(Base::exec_time_counter());
     SCOPED_TIMER(Base::_init_timer);
-    _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), 
"HashTable",
-                                                            TUnit::BYTES, 
"MemoryUsage", 1);
+    _hash_table_memory_usage =
+            ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageHashTable", 
TUnit::BYTES, 1);
     _serialize_key_arena_memory_usage = 
Base::profile()->AddHighWaterMarkCounter(
-            "SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1);
+            "MemoryUsageSerializeKeyArena", TUnit::BYTES, "", 1);
 
     _build_timer = ADD_TIMER(Base::profile(), "BuildTime");
     _merge_timer = ADD_TIMER(Base::profile(), "MergeTime");
@@ -350,10 +350,10 @@ Status 
StreamingAggLocalState::_merge_without_key(vectorized::Block* block) {
 }
 
 void StreamingAggLocalState::_update_memusage_without_key() {
-    auto arena_memory_usage = _agg_arena_pool->size() - 
_mem_usage_record.used_in_arena;
-    Base::_mem_tracker->consume(arena_memory_usage);
-    _serialize_key_arena_memory_usage->add(arena_memory_usage);
-    _mem_usage_record.used_in_arena = _agg_arena_pool->size();
+    int64_t arena_memory_usage = _agg_arena_pool->size();
+    COUNTER_SET(_memory_used_counter, arena_memory_usage);
+    COUNTER_SET(_peak_memory_usage_counter, arena_memory_usage);
+    COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
 }
 
 Status StreamingAggLocalState::_execute_with_serialized_key(vectorized::Block* 
block) {
@@ -365,28 +365,25 @@ Status 
StreamingAggLocalState::_execute_with_serialized_key(vectorized::Block* b
 }
 
 void StreamingAggLocalState::_update_memusage_with_serialized_key() {
-    std::visit(
-            vectorized::Overload {
-                    [&](std::monostate& arg) -> void {
-                        throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
"uninited hash table");
-                    },
-                    [&](auto& agg_method) -> void {
-                        auto& data = *agg_method.hash_table;
-                        auto arena_memory_usage = _agg_arena_pool->size() +
-                                                  
_aggregate_data_container->memory_usage() -
-                                                  
_mem_usage_record.used_in_arena;
-                        Base::_mem_tracker->consume(arena_memory_usage);
-                        
Base::_mem_tracker->consume(data.get_buffer_size_in_bytes() -
-                                                    
_mem_usage_record.used_in_state);
-                        
_serialize_key_arena_memory_usage->add(arena_memory_usage);
-                        COUNTER_UPDATE(
-                                _hash_table_memory_usage,
-                                data.get_buffer_size_in_bytes() - 
_mem_usage_record.used_in_state);
-                        _mem_usage_record.used_in_state = 
data.get_buffer_size_in_bytes();
-                        _mem_usage_record.used_in_arena =
-                                _agg_arena_pool->size() + 
_aggregate_data_container->memory_usage();
-                    }},
-            _agg_data->method_variant);
+    std::visit(vectorized::Overload {
+                       [&](std::monostate& arg) -> void {
+                           throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
"uninited hash table");
+                       },
+                       [&](auto& agg_method) -> void {
+                           auto& data = *agg_method.hash_table;
+                           int64_t arena_memory_usage = 
_agg_arena_pool->size() +
+                                                        
_aggregate_data_container->memory_usage();
+                           int64_t hash_table_memory_usage = 
data.get_buffer_size_in_bytes();
+
+                           COUNTER_SET(_memory_used_counter,
+                                       arena_memory_usage + 
hash_table_memory_usage);
+                           COUNTER_SET(_peak_memory_usage_counter,
+                                       arena_memory_usage + 
hash_table_memory_usage);
+
+                           COUNTER_SET(_serialize_key_arena_memory_usage, 
arena_memory_usage);
+                           COUNTER_SET(_hash_table_memory_usage, 
hash_table_memory_usage);
+                       }},
+               _agg_data->method_variant);
 }
 
 template <bool limit>
@@ -508,7 +505,6 @@ Status 
StreamingAggLocalState::do_pre_agg(vectorized::Block* input_block,
     // pre stream agg need use _num_row_return to decide whether to do pre 
stream agg
     _cur_num_rows_returned += output_block->rows();
     _make_nullable_output_key(output_block);
-    //    COUNTER_SET(_rows_returned_counter, _num_rows_returned);
     _executor->update_memusage(this);
     return Status::OK();
 }
@@ -1247,7 +1243,6 @@ Status StreamingAggLocalState::close(RuntimeState* state) 
{
 
     std::vector<char> tmp_deserialize_buffer;
     _deserialize_buffer.swap(tmp_deserialize_buffer);
-    Base::_mem_tracker->release(_mem_usage_record.used_in_state + 
_mem_usage_record.used_in_arena);
 
     /// _hash_table_size_counter may be null if prepare failed.
     if (_hash_table_size_counter) {
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index 59d5491d10c..b695880ac28 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -167,12 +167,6 @@ private:
     };
     std::unique_ptr<ExecutorBase> _executor = nullptr;
 
-    struct MemoryRecord {
-        MemoryRecord() : used_in_arena(0), used_in_state(0) {}
-        int64_t used_in_arena;
-        int64_t used_in_state;
-    };
-    MemoryRecord _mem_usage_record;
     std::unique_ptr<vectorized::Block> _child_block = nullptr;
     bool _child_eos = false;
     std::unique_ptr<vectorized::Block> _pre_aggregated_block = nullptr;
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
index 2d20b8f365c..c4832b9958c 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
@@ -26,7 +26,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* 
state, LocalStateInfo&
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_init_timer);
     _channel_id = info.task_idx;
-    _shared_state->mem_trackers[_channel_id] = _mem_tracker.get();
+    _shared_state->mem_counters[_channel_id] = _memory_used_counter;
     _exchanger = _shared_state->exchanger.get();
     DCHECK(_exchanger != nullptr);
     _get_block_failed_counter =
@@ -105,8 +105,8 @@ std::string LocalExchangeSourceLocalState::debug_string(int 
indentation_level) c
                    _exchanger->data_queue_debug_string(_channel_id));
     size_t i = 0;
     fmt::format_to(debug_string_buffer, ", MemTrackers: ");
-    for (auto* mem_tracker : _shared_state->mem_trackers) {
-        fmt::format_to(debug_string_buffer, "{}: {}, ", i, 
mem_tracker->consumption());
+    for (auto* mem_counter : _shared_state->mem_counters) {
+        fmt::format_to(debug_string_buffer, "{}: {}, ", i, 
mem_counter->value());
         i++;
     }
     return fmt::to_string(debug_string_buffer);
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index fa34b6a4040..23f91cca631 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -118,8 +118,7 @@ Status ShuffleExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block,
     }
     {
         SCOPED_TIMER(local_state._compute_hash_value_timer);
-        RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, 
in_block,
-                                                                  
local_state.mem_tracker()));
+        RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, 
in_block));
     }
     {
         SCOPED_TIMER(local_state._distribute_timer);
diff --git a/be/src/vec/runtime/partitioner.cpp 
b/be/src/vec/runtime/partitioner.cpp
index 0d6165b7555..d0c8f2d8fcd 100644
--- a/be/src/vec/runtime/partitioner.cpp
+++ b/be/src/vec/runtime/partitioner.cpp
@@ -25,8 +25,8 @@
 namespace doris::vectorized {
 
 template <typename HashValueType, typename ChannelIds>
-Status Partitioner<HashValueType, ChannelIds>::do_partitioning(RuntimeState* 
state, Block* block,
-                                                               MemTracker* 
mem_tracker) const {
+Status Partitioner<HashValueType, ChannelIds>::do_partitioning(RuntimeState* 
state,
+                                                               Block* block) 
const {
     int rows = block->rows();
 
     if (rows > 0) {
@@ -38,10 +38,7 @@ Status Partitioner<HashValueType, 
ChannelIds>::do_partitioning(RuntimeState* sta
         _hash_vals.resize(rows);
         std::fill(_hash_vals.begin(), _hash_vals.end(), 0);
         auto* __restrict hashes = _hash_vals.data();
-        {
-            SCOPED_CONSUME_MEM_TRACKER(mem_tracker);
-            RETURN_IF_ERROR(_get_partition_column_result(block, result));
-        }
+        RETURN_IF_ERROR(_get_partition_column_result(block, result));
         for (int j = 0; j < result_size; ++j) {
             
_do_hash(unpack_if_const(block->get_by_position(result[j]).column).first, 
hashes, j);
         }
@@ -50,10 +47,7 @@ Status Partitioner<HashValueType, 
ChannelIds>::do_partitioning(RuntimeState* sta
             hashes[i] = ChannelIds()(hashes[i], _partition_count);
         }
 
-        {
-            SCOPED_CONSUME_MEM_TRACKER(mem_tracker);
-            Block::erase_useless_column(block, column_to_keep);
-        }
+        Block::erase_useless_column(block, column_to_keep);
     }
     return Status::OK();
 }
diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h
index 3152edb5cb5..5607a83327b 100644
--- a/be/src/vec/runtime/partitioner.h
+++ b/be/src/vec/runtime/partitioner.h
@@ -48,8 +48,7 @@ public:
 
     virtual Status open(RuntimeState* state) = 0;
 
-    virtual Status do_partitioning(RuntimeState* state, Block* block,
-                                   MemTracker* mem_tracker) const = 0;
+    virtual Status do_partitioning(RuntimeState* state, Block* block) const = 
0;
 
     virtual ChannelField get_channel_ids() const = 0;
 
@@ -75,8 +74,7 @@ public:
 
     Status open(RuntimeState* state) override { return 
VExpr::open(_partition_expr_ctxs, state); }
 
-    Status do_partitioning(RuntimeState* state, Block* block,
-                           MemTracker* mem_tracker) const override;
+    Status do_partitioning(RuntimeState* state, Block* block) const override;
 
     ChannelField get_channel_ids() const override {
         return {_hash_vals.data(), sizeof(HashValueType)};
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp 
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index c14d119e0fe..78067b9b181 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -61,12 +61,13 @@ inline uint32_t VDataStreamMgr::get_hash_value(const 
TUniqueId& fragment_instanc
 }
 
 std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::create_recvr(
-        RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId& 
fragment_instance_id,
-        PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile, 
bool is_merging) {
+        RuntimeState* state, pipeline::ExchangeLocalState* parent, const 
RowDescriptor& row_desc,
+        const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int 
num_senders,
+        RuntimeProfile* profile, bool is_merging) {
     DCHECK(profile != nullptr);
     VLOG_FILE << "creating receiver for fragment=" << 
print_id(fragment_instance_id)
               << ", node=" << dest_node_id;
-    std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(this, state, 
row_desc,
+    std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(this, parent, 
state, row_desc,
                                                                  
fragment_instance_id, dest_node_id,
                                                                  num_senders, 
is_merging, profile));
     uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
diff --git a/be/src/vec/runtime/vdata_stream_mgr.h 
b/be/src/vec/runtime/vdata_stream_mgr.h
index 09e347fcfb2..bd5e6f9b91e 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.h
+++ b/be/src/vec/runtime/vdata_stream_mgr.h
@@ -40,6 +40,9 @@ class RuntimeState;
 class RowDescriptor;
 class RuntimeProfile;
 class PTransmitDataParams;
+namespace pipeline {
+class ExchangeLocalState;
+}
 
 namespace vectorized {
 class VDataStreamRecvr;
@@ -50,6 +53,7 @@ public:
     ~VDataStreamMgr();
 
     std::shared_ptr<VDataStreamRecvr> create_recvr(RuntimeState* state,
+                                                   
pipeline::ExchangeLocalState* parent,
                                                    const RowDescriptor& 
row_desc,
                                                    const TUniqueId& 
fragment_instance_id,
                                                    PlanNodeId dest_node_id, 
int num_senders,
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 1ca6bb7f2c5..f1dfbbf3047 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -96,6 +96,7 @@ Status 
VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block
     DCHECK(!_block_queue.empty());
     auto [next_block, block_byte_size] = std::move(_block_queue.front());
     _block_queue.pop_front();
+    _recvr->_parent->memory_used_counter()->update(-(int64_t)block_byte_size);
     sub_blocks_memory_usage(block_byte_size);
     _record_debug_info();
     if (_block_queue.empty() && _source_dependency) {
@@ -207,6 +208,9 @@ Status VDataStreamRecvr::SenderQueue::add_block(const 
PBlock& pblock, int be_num
         _pending_closures.emplace_back(*done, monotonicStopWatch);
         *done = nullptr;
     }
+    _recvr->_parent->memory_used_counter()->update(block_byte_size);
+    _recvr->_parent->peak_memory_usage_counter()->set(
+            _recvr->_parent->memory_used_counter()->value());
     add_blocks_memory_usage(block_byte_size);
     return Status::OK();
 }
@@ -245,6 +249,9 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, 
bool use_move) {
         _record_debug_info();
         try_set_dep_ready_without_lock();
         COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
+        _recvr->_parent->memory_used_counter()->update(block_mem_size);
+        _recvr->_parent->peak_memory_usage_counter()->set(
+                _recvr->_parent->memory_used_counter()->value());
         add_blocks_memory_usage(block_mem_size);
     }
 }
@@ -315,12 +322,13 @@ void VDataStreamRecvr::SenderQueue::close() {
     _block_queue.clear();
 }
 
-VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* 
state,
-                                   const RowDescriptor& row_desc,
+VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, 
pipeline::ExchangeLocalState* parent,
+                                   RuntimeState* state, const RowDescriptor& 
row_desc,
                                    const TUniqueId& fragment_instance_id, 
PlanNodeId dest_node_id,
                                    int num_senders, bool is_merging, 
RuntimeProfile* profile)
         : HasTaskExecutionCtx(state),
           _mgr(stream_mgr),
+          _parent(parent),
           _query_thread_context(state->query_id(), state->query_mem_tracker(),
                                 state->get_query_ctx()->workload_group()),
           _fragment_instance_id(fragment_instance_id),
@@ -352,9 +360,6 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* 
stream_mgr, RuntimeState* sta
     }
 
     // Initialize the counters
-    _memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage");
-    _peak_memory_usage_counter =
-            _profile->add_counter("PeakMemoryUsage", TUnit::BYTES, 
"MemoryUsage");
     _remote_bytes_received_counter = ADD_COUNTER(_profile, 
"RemoteBytesReceived", TUnit::BYTES);
     _local_bytes_received_counter = ADD_COUNTER(_profile, 
"LocalBytesReceived", TUnit::BYTES);
 
@@ -417,7 +422,6 @@ std::shared_ptr<pipeline::Dependency> 
VDataStreamRecvr::get_local_channel_depend
 }
 
 Status VDataStreamRecvr::get_next(Block* block, bool* eos) {
-    _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
     if (!_is_merging) {
         block->clear();
         return _sender_queues[0]->get_batch(block, eos);
@@ -492,9 +496,6 @@ void VDataStreamRecvr::close() {
     _mgr = nullptr;
 
     _merger.reset();
-    if (_peak_memory_usage_counter) {
-        _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
-    }
 }
 
 void VDataStreamRecvr::set_sink_dep_always_ready() const {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index e8dcfdedba5..b2d76590ba2 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -69,7 +69,8 @@ class VDataStreamRecvr;
 class VDataStreamRecvr : public HasTaskExecutionCtx {
 public:
     class SenderQueue;
-    VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* state, const 
RowDescriptor& row_desc,
+    VDataStreamRecvr(VDataStreamMgr* stream_mgr, pipeline::ExchangeLocalState* 
parent,
+                     RuntimeState* state, const RowDescriptor& row_desc,
                      const TUniqueId& fragment_instance_id, PlanNodeId 
dest_node_id,
                      int num_senders, bool is_merging, RuntimeProfile* 
profile);
 
@@ -120,6 +121,8 @@ private:
     // DataStreamMgr instance used to create this recvr. (Not owned)
     VDataStreamMgr* _mgr = nullptr;
 
+    pipeline::ExchangeLocalState* _parent = nullptr;
+
     QueryThreadContext _query_thread_context;
 
     // Fragment and node id of the destination exchange node this receiver is 
used by.
@@ -152,8 +155,6 @@ private:
     RuntimeProfile::Counter* _data_arrival_timer = nullptr;
     RuntimeProfile::Counter* _decompress_timer = nullptr;
     RuntimeProfile::Counter* _decompress_bytes = nullptr;
-    RuntimeProfile::Counter* _memory_usage_counter = nullptr;
-    RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
 
     // Number of rows received
     RuntimeProfile::Counter* _rows_produced_counter = nullptr;
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index a458d7f5ef1..e169d57e05e 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -253,12 +253,10 @@ Status BlockSerializer::next_serialized_block(Block* 
block, PBlock* dest, int nu
                                               bool* serialized, bool eos,
                                               const std::vector<uint32_t>* 
rows) {
     if (_mutable_block == nullptr) {
-        SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
         _mutable_block = MutableBlock::create_unique(block->clone_empty());
     }
 
     {
-        SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
         if (rows) {
             if (!rows->empty()) {
                 const auto* begin = rows->data();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to