This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 1d52edc1ced [fix](counters) fix MemoryUsage and PeakMemoryUsage
counters of some operators (#41602) (#41886)
1d52edc1ced is described below
commit 1d52edc1ced3eb14b209f906564874a463e57f7f
Author: TengJianPing <[email protected]>
AuthorDate: Mon Jan 13 17:47:25 2025 +0800
[fix](counters) fix MemoryUsage and PeakMemoryUsage counters of some
operators (#41602) (#41886)
BP #41602
---
be/src/pipeline/dependency.cpp | 2 +-
be/src/pipeline/dependency.h | 13 ++--
be/src/pipeline/exec/aggregation_sink_operator.cpp | 46 +++++-------
be/src/pipeline/exec/aggregation_sink_operator.h | 2 +-
.../pipeline/exec/aggregation_source_operator.cpp | 4 --
be/src/pipeline/exec/analytic_sink_operator.cpp | 5 +-
be/src/pipeline/exec/analytic_sink_operator.h | 1 +
be/src/pipeline/exec/analytic_source_operator.cpp | 1 -
be/src/pipeline/exec/assert_num_rows_operator.cpp | 2 -
be/src/pipeline/exec/cache_source_operator.cpp | 1 -
.../distinct_streaming_aggregation_operator.cpp | 1 -
be/src/pipeline/exec/exchange_sink_buffer.cpp | 16 +++++
be/src/pipeline/exec/exchange_sink_operator.cpp | 42 +++++++----
be/src/pipeline/exec/exchange_sink_operator.h | 1 -
be/src/pipeline/exec/exchange_source_operator.cpp | 6 +-
.../exec/group_commit_block_sink_operator.cpp | 3 -
be/src/pipeline/exec/hashjoin_build_sink.cpp | 84 +++++++++++-----------
be/src/pipeline/exec/hashjoin_build_sink.h | 2 -
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 8 ++-
.../pipeline/exec/join/process_hash_table_probe.h | 1 -
.../exec/join/process_hash_table_probe_impl.h | 7 +-
.../exec/multi_cast_data_stream_source.cpp | 1 -
be/src/pipeline/exec/operator.cpp | 40 ++++++-----
be/src/pipeline/exec/operator.h | 17 ++---
.../pipeline/exec/partition_sort_sink_operator.cpp | 4 +-
.../exec/partition_sort_source_operator.cpp | 2 -
.../exec/partitioned_aggregation_sink_operator.cpp | 10 +--
.../exec/partitioned_hash_join_probe_operator.cpp | 3 +-
.../exec/partitioned_hash_join_sink_operator.cpp | 6 +-
be/src/pipeline/exec/repeat_operator.cpp | 1 -
be/src/pipeline/exec/scan_operator.h | 10 ++-
be/src/pipeline/exec/sort_sink_operator.cpp | 8 ++-
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 11 +--
.../exec/streaming_aggregation_operator.cpp | 57 +++++++--------
.../pipeline/exec/streaming_aggregation_operator.h | 6 --
.../local_exchange_source_operator.cpp | 6 +-
be/src/pipeline/local_exchange/local_exchanger.cpp | 3 +-
be/src/vec/runtime/partitioner.cpp | 14 ++--
be/src/vec/runtime/partitioner.h | 6 +-
be/src/vec/runtime/vdata_stream_mgr.cpp | 7 +-
be/src/vec/runtime/vdata_stream_mgr.h | 4 ++
be/src/vec/runtime/vdata_stream_recvr.cpp | 19 ++---
be/src/vec/runtime/vdata_stream_recvr.h | 7 +-
be/src/vec/sink/vdata_stream_sender.cpp | 2 -
44 files changed, 246 insertions(+), 246 deletions(-)
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 560efec94e1..93117fa71a0 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -199,7 +199,7 @@ void LocalExchangeSharedState::sub_running_source_operators(
LocalExchangeSharedState::LocalExchangeSharedState(int num_instances) {
source_deps.resize(num_instances, nullptr);
- mem_trackers.resize(num_instances, nullptr);
+ mem_counters.resize(num_instances, nullptr);
}
vectorized::MutableColumns AggSharedState::_get_keys_hash_table() {
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index c689ec7ee6a..fea6d9cb7bb 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -324,11 +324,6 @@ public:
vectorized::Sizes offsets_of_aggregate_states;
std::vector<size_t> make_nullable_keys;
- struct MemoryRecord {
- int64_t used_in_arena {};
- int64_t used_in_state {};
- };
- MemoryRecord mem_usage_record;
bool agg_data_created_without_key = false;
bool enable_spill = false;
bool reach_limit = false;
@@ -826,7 +821,7 @@ public:
LocalExchangeSharedState(int num_instances);
~LocalExchangeSharedState() override;
std::unique_ptr<ExchangerBase> exchanger {};
- std::vector<MemTracker*> mem_trackers;
+ std::vector<RuntimeProfile::Counter*> mem_counters;
std::atomic<int64_t> mem_usage = 0;
// We need to make sure to add mem_usage first and then enqueue, otherwise
sub mem_usage may cause negative mem_usage during concurrent dequeue.
std::mutex le_lock;
@@ -862,13 +857,15 @@ public:
}
void add_mem_usage(int channel_id, size_t delta, bool
update_total_mem_usage = true) {
- mem_trackers[channel_id]->consume(delta);
+ mem_counters[channel_id]->update(delta);
if (update_total_mem_usage) {
add_total_mem_usage(delta, channel_id);
}
}
- void sub_mem_usage(int channel_id, size_t delta) {
mem_trackers[channel_id]->release(delta); }
+ void sub_mem_usage(int channel_id, size_t delta) {
+ mem_counters[channel_id]->update(-(int64_t)delta);
+ }
virtual void add_total_mem_usage(size_t delta, int channel_id) {
if (mem_usage.fetch_add(delta) + delta >
config::local_exchange_buffer_mem_limit) {
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 627dbf4ef41..f5caaf30bfc 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -57,10 +57,10 @@ Status AggSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info) {
_agg_data = Base::_shared_state->agg_data.get();
_agg_arena_pool = Base::_shared_state->agg_arena_pool.get();
_hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize",
TUnit::UNIT);
- _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(),
"HashTable",
- TUnit::BYTES,
"MemoryUsage", 1);
- _serialize_key_arena_memory_usage =
Base::profile()->AddHighWaterMarkCounter(
- "SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1);
+ _hash_table_memory_usage =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageHashTable",
TUnit::BYTES, 1);
+ _serialize_key_arena_memory_usage = ADD_COUNTER_WITH_LEVEL(
+ Base::profile(), "MemoryUsageSerializeKeyArena", TUnit::BYTES, 1);
_build_timer = ADD_TIMER(Base::profile(), "BuildTime");
_merge_timer = ADD_TIMER(Base::profile(), "MergeTime");
@@ -223,24 +223,17 @@ void
AggSinkLocalState::_update_memusage_with_serialized_key() {
},
[&](auto& agg_method) -> void {
auto& data = *agg_method.hash_table;
- auto arena_memory_usage =
+ int64_t arena_memory_usage =
_agg_arena_pool->size() +
-
Base::_shared_state->aggregate_data_container->memory_usage() -
-
Base::_shared_state->mem_usage_record.used_in_arena;
- Base::_mem_tracker->consume(arena_memory_usage);
- Base::_mem_tracker->consume(
- data.get_buffer_size_in_bytes() -
-
Base::_shared_state->mem_usage_record.used_in_state);
-
_serialize_key_arena_memory_usage->add(arena_memory_usage);
- COUNTER_UPDATE(
- _hash_table_memory_usage,
- data.get_buffer_size_in_bytes() -
-
Base::_shared_state->mem_usage_record.used_in_state);
- Base::_shared_state->mem_usage_record.used_in_state
=
- data.get_buffer_size_in_bytes();
- Base::_shared_state->mem_usage_record.used_in_arena
=
- _agg_arena_pool->size() +
-
Base::_shared_state->aggregate_data_container->memory_usage();
+
_shared_state->aggregate_data_container->memory_usage();
+ int64_t hash_table_memory_usage =
data.get_buffer_size_in_bytes();
+
+ COUNTER_SET(_memory_used_counter,
+ arena_memory_usage +
hash_table_memory_usage);
+ COUNTER_SET(_peak_memory_usage_counter,
_memory_used_counter->value());
+
+ COUNTER_SET(_serialize_key_arena_memory_usage,
arena_memory_usage);
+ COUNTER_SET(_hash_table_memory_usage,
hash_table_memory_usage);
}},
_agg_data->method_variant);
}
@@ -419,11 +412,10 @@ Status
AggSinkLocalState::_merge_without_key(vectorized::Block* block) {
}
void AggSinkLocalState::_update_memusage_without_key() {
- auto arena_memory_usage =
- _agg_arena_pool->size() -
Base::_shared_state->mem_usage_record.used_in_arena;
- Base::_mem_tracker->consume(arena_memory_usage);
- _serialize_key_arena_memory_usage->add(arena_memory_usage);
- Base::_shared_state->mem_usage_record.used_in_arena =
_agg_arena_pool->size();
+ int64_t arena_memory_usage = _agg_arena_pool->size();
+ COUNTER_SET(_memory_used_counter, arena_memory_usage);
+ COUNTER_SET(_peak_memory_usage_counter, arena_memory_usage);
+ COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
}
Status AggSinkLocalState::_execute_with_serialized_key(vectorized::Block*
block) {
@@ -875,8 +867,6 @@ Status AggSinkLocalState::close(RuntimeState* state, Status
exec_status) {
std::vector<char> tmp_deserialize_buffer;
_deserialize_buffer.swap(tmp_deserialize_buffer);
-
Base::_mem_tracker->release(Base::_shared_state->mem_usage_record.used_in_state
+
-
Base::_shared_state->mem_usage_record.used_in_arena);
return Base::close(state, exec_status);
}
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 975b04477f2..e5209425f76 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -106,7 +106,7 @@ protected:
RuntimeProfile::Counter* _deserialize_data_timer = nullptr;
RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
- RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage =
nullptr;
+ RuntimeProfile::Counter* _serialize_key_arena_memory_usage = nullptr;
bool _should_limit_output = false;
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index a406bdc329e..fbc71ac06ad 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -450,8 +450,6 @@ void AggLocalState::do_agg_limit(vectorized::Block* block,
bool* eos) {
vectorized::Block::filter_block_internal(block,
_shared_state->need_computes);
if (auto rows = block->rows()) {
_num_rows_returned += rows;
- COUNTER_UPDATE(_blocks_returned_counter, 1);
- COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}
} else {
reached_limit(block, eos);
@@ -459,8 +457,6 @@ void AggLocalState::do_agg_limit(vectorized::Block* block,
bool* eos) {
} else {
if (auto rows = block->rows()) {
_num_rows_returned += rows;
- COUNTER_UPDATE(_blocks_returned_counter, 1);
- COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}
}
}
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 839a485f2d9..7c59c954368 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -33,6 +33,7 @@ Status AnalyticSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
_compute_agg_data_timer = ADD_TIMER(profile(), "ComputeAggDataTime");
_compute_partition_by_timer = ADD_TIMER(profile(),
"ComputePartitionByTime");
_compute_order_by_timer = ADD_TIMER(profile(), "ComputeOrderByTime");
+ _blocks_memory_usage = ADD_COUNTER_WITH_LEVEL(_profile,
"MemoryUsageBlocks", TUnit::BYTES, 1);
return Status::OK();
}
@@ -322,8 +323,10 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState*
state, vectorized::Block
}
}
- COUNTER_UPDATE(local_state._memory_used_counter,
input_block->allocated_bytes());
+ int64_t block_mem_usage = input_block->allocated_bytes();
+ COUNTER_UPDATE(local_state._memory_used_counter, block_mem_usage);
COUNTER_SET(local_state._peak_memory_usage_counter,
local_state._memory_used_counter->value());
+ COUNTER_UPDATE(local_state._blocks_memory_usage, block_mem_usage);
//TODO: if need improvement, the is a tips to maintain a free queue,
//so the memory could reuse, no need to new/delete again;
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h
b/be/src/pipeline/exec/analytic_sink_operator.h
index 084998d2c36..5b08f0a6e3d 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -61,6 +61,7 @@ private:
RuntimeProfile::Counter* _compute_agg_data_timer = nullptr;
RuntimeProfile::Counter* _compute_partition_by_timer = nullptr;
RuntimeProfile::Counter* _compute_order_by_timer = nullptr;
+ RuntimeProfile::Counter* _blocks_memory_usage = nullptr;
std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs;
};
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp
b/be/src/pipeline/exec/analytic_source_operator.cpp
index 06a6374bbae..205e6e7d3e2 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -444,7 +444,6 @@ bool AnalyticLocalState::init_next_partition(BlockRowPos
found_partition_end) {
Status AnalyticLocalState::output_current_block(vectorized::Block* block) {
block->swap(std::move(_shared_state->input_blocks[_output_block_index]));
_blocks_memory_usage->add(-block->allocated_bytes());
- mem_tracker()->consume(-block->allocated_bytes());
if (_shared_state->origin_cols.size() < block->columns()) {
block->erase_not_in(_shared_state->origin_cols);
}
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp
b/be/src/pipeline/exec/assert_num_rows_operator.cpp
index 563c4bf49ca..f4ad31a6f90 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.cpp
+++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp
@@ -115,8 +115,6 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
return Status::Cancelled("Expected {} {} to be returned by expression
{}",
to_string_lambda(_assertion),
_desired_num_rows, _subquery_string);
}
- COUNTER_SET(local_state.rows_returned_counter(),
local_state.num_rows_returned());
- COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts,
block,
block->columns()));
return Status::OK();
diff --git a/be/src/pipeline/exec/cache_source_operator.cpp
b/be/src/pipeline/exec/cache_source_operator.cpp
index e98a18b76a3..1387de351d1 100644
--- a/be/src/pipeline/exec/cache_source_operator.cpp
+++ b/be/src/pipeline/exec/cache_source_operator.cpp
@@ -159,7 +159,6 @@ Status CacheSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* b
local_state._current_query_cache_rows += output_block->rows();
auto mem_consume = output_block->allocated_bytes();
local_state._current_query_cache_bytes += mem_consume;
- local_state._mem_tracker->consume(mem_consume);
if (_cache_param.entry_max_bytes <
local_state._current_query_cache_bytes ||
_cache_param.entry_max_rows <
local_state._current_query_cache_rows) {
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 75b26c3ed18..2f4a046003f 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -461,7 +461,6 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState*
state, vectorized::Bloc
block->columns()));
}
local_state.add_num_rows_returned(block->rows());
- COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
// If the limit is not reached, it is important to ensure that
_aggregated_block is empty
// because it may still contain data.
// However, if the limit is reached, there is no need to output data even
if some exists.
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 1753991fe52..45aeda8d9d3 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -156,6 +156,9 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&&
request) {
if (request.block) {
RETURN_IF_ERROR(
BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
+ COUNTER_UPDATE(_parent->memory_used_counter(),
request.block->ByteSizeLong());
+ COUNTER_SET(_parent->peak_memory_usage_counter(),
+ _parent->memory_used_counter()->value());
}
_instance_to_package_queue[ins_id].emplace(std::move(request));
_total_queue_size++;
@@ -291,6 +294,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
if (request.block) {
static_cast<void>(brpc_request->release_block());
+ COUNTER_UPDATE(_parent->memory_used_counter(),
-request.block->ByteSizeLong());
}
q.pop();
_total_queue_size--;
@@ -416,12 +420,24 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId
id) {
_turn_off_channel(id, lock);
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>&
broadcast_q =
_instance_to_broadcast_package_queue[id];
+ for (; !broadcast_q.empty(); broadcast_q.pop()) {
+ if (broadcast_q.front().block_holder->get_block()) {
+ COUNTER_UPDATE(_parent->memory_used_counter(),
+
-broadcast_q.front().block_holder->get_block()->ByteSizeLong());
+ }
+ }
{
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>
empty;
swap(empty, broadcast_q);
}
std::queue<TransmitInfo, std::list<TransmitInfo>>& q =
_instance_to_package_queue[id];
+ for (; !q.empty(); q.pop()) {
+ if (q.front().block) {
+ COUNTER_UPDATE(_parent->memory_used_counter(),
-q.front().block->ByteSizeLong());
+ }
+ }
+
{
std::queue<TransmitInfo, std::list<TransmitInfo>> empty;
swap(empty, q);
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 85c87df8f4d..1561c105298 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -91,7 +91,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
channels.emplace_back(channels[fragment_id_to_channel_index[fragment_instance_id.lo]]);
}
}
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
// Make sure brpc stub is ready before execution.
for (int i = 0; i < channels.size(); ++i) {
@@ -126,7 +125,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(Base::open(state));
auto& p = _parent->cast<ExchangeSinkOperatorX>();
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
if (_part_type == TPartitionType::UNPARTITIONED || _part_type ==
TPartitionType::RANDOM ||
_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
@@ -348,7 +346,6 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) {
Status ExchangeSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<ExchangeSinkLocalState>::open(state));
_state = state;
- _mem_tracker = std::make_unique<MemTracker>("ExchangeSinkOperatorX:");
_compression_type = state->fragement_transmission_compression_type();
if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
if (_output_tuple_id == -1) {
@@ -377,7 +374,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
auto& local_state = get_local_state(state);
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
SCOPED_TIMER(local_state.exec_time_counter());
-
local_state._peak_memory_usage_counter->set(local_state._mem_tracker->peak_consumption());
bool all_receiver_eof = true;
for (auto& channel : local_state.channels) {
if (!channel->is_receiver_eof()) {
@@ -389,6 +385,10 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
return Status::EndOfFile("all data stream channels EOF");
}
+ Defer defer([&]() {
+ COUNTER_SET(local_state._peak_memory_usage_counter,
+ local_state._memory_used_counter->value());
+ });
if (_part_type == TPartitionType::UNPARTITIONED ||
local_state.channels.size() == 1) {
// 1. serialize depends on it is not local exchange
// 2. send block
@@ -412,7 +412,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
} else {
auto block_holder =
vectorized::BroadcastPBlockHolder::create_shared();
{
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
bool serialized = false;
RETURN_IF_ERROR(local_state._serializer.next_serialized_block(
block, block_holder->get_block(),
local_state._rpc_channels_num,
@@ -444,7 +443,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
idx ==
local_state._last_local_channel_idx);
moved = idx ==
local_state._last_local_channel_idx;
} else {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status =
channel->send_broadcast_block(block_holder, eos);
}
HANDLE_CHANNEL_STATUS(state, channel, status);
@@ -470,7 +468,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
auto status = current_channel->send_local_block(block, eos,
true);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
} else {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
auto pblock = std::make_unique<PBlock>();
RETURN_IF_ERROR(local_state._serializer.serialize_block(block,
pblock.get()));
auto status =
current_channel->send_remote_block(std::move(pblock), eos);
@@ -484,8 +481,11 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
auto rows = block->rows();
{
SCOPED_TIMER(local_state._split_block_hash_compute_timer);
- RETURN_IF_ERROR(
- local_state._partitioner->do_partitioning(state, block,
_mem_tracker.get()));
+ RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state,
block));
+ }
+ int64_t old_channel_mem_usage = 0;
+ for (const auto& channel : local_state.channels) {
+ old_channel_mem_usage += channel->mem_usage();
}
if (_part_type == TPartitionType::HASH_PARTITIONED) {
SCOPED_TIMER(local_state._distribute_rows_into_channels_timer);
@@ -498,6 +498,14 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
state, local_state.channels, local_state._partition_count,
local_state._partitioner->get_channel_ids().get<uint32_t>(), rows, block, eos));
}
+ int64_t new_channel_mem_usage = 0;
+ for (const auto& channel : local_state.channels) {
+ new_channel_mem_usage += channel->mem_usage();
+ }
+ COUNTER_UPDATE(local_state.memory_used_counter(),
+ new_channel_mem_usage - old_channel_mem_usage);
+ COUNTER_SET(local_state.peak_memory_usage_counter(),
+ local_state.memory_used_counter()->value());
} else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
int64_t old_channel_mem_usage = 0;
for (const auto& channel : local_state.channels) {
@@ -549,10 +557,13 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
COUNTER_SET(local_state.peak_memory_usage_counter(),
local_state.memory_used_counter()->value());
} else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
+ int64_t old_channel_mem_usage = 0;
+ for (const auto& channel : local_state.channels) {
+ old_channel_mem_usage += channel->mem_usage();
+ }
{
SCOPED_TIMER(local_state._split_block_hash_compute_timer);
- RETURN_IF_ERROR(
- local_state._partitioner->do_partitioning(state, block,
_mem_tracker.get()));
+ RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state,
block));
}
std::vector<std::vector<uint32>> assignments =
local_state.scale_writer_partitioning_exchanger->accept(block);
@@ -563,6 +574,14 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
block, eos));
}
+ int64_t new_channel_mem_usage = 0;
+ for (const auto& channel : local_state.channels) {
+ new_channel_mem_usage += channel->mem_usage();
+ }
+ COUNTER_UPDATE(local_state.memory_used_counter(),
+ new_channel_mem_usage - old_channel_mem_usage);
+ COUNTER_SET(local_state.peak_memory_usage_counter(),
+ local_state.memory_used_counter()->value());
} else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
// Control the number of channels according to the flow, thereby
controlling the number of table sink writers.
// 1. select channel
@@ -573,7 +592,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
auto status = current_channel->send_local_block(block, eos,
true);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
} else {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
auto pblock = std::make_unique<PBlock>();
RETURN_IF_ERROR(local_state._serializer.serialize_block(block,
pblock.get()));
auto status =
current_channel->send_remote_block(std::move(pblock), eos);
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index bee34ad1a85..e85deb1a679 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -240,7 +240,6 @@ private:
const std::vector<TPlanFragmentDestination> _dests;
- std::unique_ptr<MemTracker> _mem_tracker;
// Identifier of the destination plan node.
const PlanNodeId _dest_node_id;
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index deafb361c57..572e848d569 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -63,8 +63,8 @@ Status ExchangeLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<ExchangeSourceOperatorX>();
stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
- state, p.input_row_desc(), state->fragment_instance_id(),
p.node_id(), p.num_senders(),
- profile(), p.is_merging());
+ state, this, p.input_row_desc(), state->fragment_instance_id(),
p.node_id(),
+ p.num_senders(), profile(), p.is_merging());
const auto& queues = stream_recvr->sender_queues();
deps.resize(queues.size());
metrics.resize(queues.size());
@@ -186,8 +186,6 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState*
state, vectorized::Block
block->set_num_rows(limit);
local_state.set_num_rows_returned(_limit);
}
- COUNTER_SET(local_state.rows_returned_counter(),
local_state.num_rows_returned());
- COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
}
return Status::OK();
}
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
index 8da335f4fa2..9b059831842 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
@@ -46,8 +46,6 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState*
state) {
_vpartition = std::make_unique<doris::VOlapTablePartitionParam>(p._schema,
p._partition);
RETURN_IF_ERROR(_vpartition->init());
_state = state;
- // profile must add to state's object pool
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
_block_convertor =
std::make_unique<vectorized::OlapTableBlockConvertor>(p._output_tuple_desc);
_block_convertor->init_autoinc_info(p._schema->db_id(),
p._schema->table_id(),
@@ -288,7 +286,6 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState*
state, vectorized::Bloc
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)input_block->rows());
- SCOPED_CONSUME_MEM_TRACKER(local_state._mem_tracker.get());
if (!local_state._load_block_queue) {
RETURN_IF_ERROR(local_state._initialize_load_queue());
}
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 4fa9f9a95a6..46272ea1328 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -74,11 +74,11 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState*
state, LocalSinkStateInfo
_runtime_filter_init_timer = ADD_TIMER(profile(), "RuntimeFilterInitTime");
_build_blocks_memory_usage =
- ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "BuildBlocks",
TUnit::BYTES, "MemoryUsage", 1);
+ ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageBuildBlocks",
TUnit::BYTES, 1);
_hash_table_memory_usage =
- ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "HashTable", TUnit::BYTES,
"MemoryUsage", 1);
+ ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageHashTable",
TUnit::BYTES, 1);
_build_arena_memory_usage =
- profile()->AddHighWaterMarkCounter("BuildKeyArena", TUnit::BYTES,
"MemoryUsage", 1);
+ ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageBuildKeyArena",
TUnit::BYTES, 1);
// Build phase
auto* record_profile = _should_build_hash_table ? profile() :
faker_runtime_profile();
@@ -302,41 +302,41 @@ Status
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
// Get the key column that needs to be built
Status st = _extract_join_column(block, null_map_val, raw_ptrs,
_build_col_ids);
- st = std::visit(
- vectorized::Overload {
- [&](std::monostate& arg, auto join_op, auto has_null_value,
- auto short_circuit_for_null_in_build_side,
- auto with_other_conjuncts) -> Status {
- LOG(FATAL) << "FATAL: uninited hash table";
- __builtin_unreachable();
- return Status::OK();
- },
- [&](auto&& arg, auto&& join_op, auto has_null_value,
- auto short_circuit_for_null_in_build_side,
- auto with_other_conjuncts) -> Status {
- using HashTableCtxType = std::decay_t<decltype(arg)>;
- using JoinOpType = std::decay_t<decltype(join_op)>;
- ProcessHashTableBuild<HashTableCtxType>
hash_table_build_process(
- rows, raw_ptrs, this, state->batch_size(),
state);
- auto old_hash_table_size =
arg.hash_table->get_byte_size();
- auto old_key_size = arg.serialized_keys_size(true);
- auto st = hash_table_build_process.template run<
- JoinOpType::value, has_null_value,
- short_circuit_for_null_in_build_side,
with_other_conjuncts>(
- arg,
- has_null_value ||
short_circuit_for_null_in_build_side
- ? &null_map_val->get_data()
- : nullptr,
- &_shared_state->_has_null_in_build_side);
- _mem_tracker->consume(arg.hash_table->get_byte_size() -
- old_hash_table_size);
- _mem_tracker->consume(arg.serialized_keys_size(true) -
old_key_size);
- return st;
- }},
- *_shared_state->hash_table_variants,
_shared_state->join_op_variants,
- vectorized::make_bool_variant(_build_side_ignore_null),
-
vectorized::make_bool_variant(p._short_circuit_for_null_in_build_side),
- vectorized::make_bool_variant((p._have_other_join_conjunct)));
+ st = std::visit(vectorized::Overload {
+ [&](std::monostate& arg, auto join_op, auto
has_null_value,
+ auto short_circuit_for_null_in_build_side,
+ auto with_other_conjuncts) -> Status {
+ LOG(FATAL) << "FATAL: uninited hash table";
+ __builtin_unreachable();
+ return Status::OK();
+ },
+ [&](auto&& arg, auto&& join_op, auto
has_null_value,
+ auto short_circuit_for_null_in_build_side,
+ auto with_other_conjuncts) -> Status {
+ using HashTableCtxType =
std::decay_t<decltype(arg)>;
+ using JoinOpType =
std::decay_t<decltype(join_op)>;
+ ProcessHashTableBuild<HashTableCtxType>
hash_table_build_process(
+ rows, raw_ptrs, this,
state->batch_size(), state);
+ auto st = hash_table_build_process.template
run<
+ JoinOpType::value, has_null_value,
+ short_circuit_for_null_in_build_side,
with_other_conjuncts>(
+ arg,
+ has_null_value ||
short_circuit_for_null_in_build_side
+ ? &null_map_val->get_data()
+ : nullptr,
+
&_shared_state->_has_null_in_build_side);
+ COUNTER_SET(_memory_used_counter,
+
_build_blocks_memory_usage->value() +
+
(int64_t)(arg.hash_table->get_byte_size() +
+
arg.serialized_keys_size(true)));
+ COUNTER_SET(_peak_memory_usage_counter,
+ _memory_used_counter->value());
+ return st;
+ }},
+ *_shared_state->hash_table_variants,
_shared_state->join_op_variants,
+ vectorized::make_bool_variant(_build_side_ignore_null),
+
vectorized::make_bool_variant(p._short_circuit_for_null_in_build_side),
+
vectorized::make_bool_variant((p._have_other_join_conjunct)));
return st;
}
@@ -532,7 +532,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
if (local_state._should_build_hash_table) {
// If eos or have already met a null value using short-circuit
strategy, we do not need to pull
// data from probe side.
- local_state._build_side_mem_used += in_block->allocated_bytes();
if (local_state._build_side_mutable_block.empty()) {
auto tmp_build_block =
vectorized::VectorizedUtils::create_empty_columnswithtypename(
@@ -559,12 +558,13 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
std::to_string(std::numeric_limits<uint32_t>::max()));
}
- local_state._mem_tracker->consume(in_block->bytes());
- COUNTER_UPDATE(local_state._build_blocks_memory_usage,
in_block->bytes());
-
SCOPED_TIMER(local_state._build_side_merge_block_timer);
RETURN_IF_ERROR(local_state._build_side_mutable_block.merge_ignore_overflow(
std::move(*in_block)));
+ int64_t blocks_mem_usage =
local_state._build_side_mutable_block.allocated_bytes();
+ COUNTER_SET(local_state._memory_used_counter, blocks_mem_usage);
+ COUNTER_SET(local_state._peak_memory_usage_counter,
blocks_mem_usage);
+ COUNTER_SET(local_state._build_blocks_memory_usage,
blocks_mem_usage);
}
}
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 4833bee5488..f8634ac4c49 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -74,8 +74,6 @@ protected:
std::vector<vectorized::ColumnPtr> _key_columns_holder;
bool _should_build_hash_table = true;
- int64_t _build_side_mem_used = 0;
- int64_t _build_side_last_mem_used = 0;
size_t _build_side_rows = 0;
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 4f9184db8a5..e3afa65ae46 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -54,7 +54,7 @@ Status HashJoinProbeLocalState::init(RuntimeState* state,
LocalStateInfo& info)
_construct_mutable_join_block();
_probe_column_disguise_null.reserve(_probe_expr_ctxs.size());
_probe_arena_memory_usage =
- profile()->AddHighWaterMarkCounter("ProbeKeyArena", TUnit::BYTES,
"MemoryUsage", 1);
+ profile()->AddHighWaterMarkCounter("MemoryUsageProbeKeyArena",
TUnit::BYTES, "", 1);
// Probe phase
_probe_expr_call_timer = ADD_TIMER(profile(), "ProbeExprCallTime");
_search_hashtable_timer = ADD_TIMER(profile(),
"ProbeWhenSearchHashTableTime");
@@ -301,8 +301,6 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
mutable_join_block,
&temp_block,
local_state._probe_block.rows(), _is_mark_join,
_have_other_join_conjunct);
- local_state._mem_tracker->set_consumption(
- arg.serialized_keys_size(false));
} else {
st = Status::InternalError("uninited hash table");
}
@@ -498,6 +496,10 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state,
vectorized::Block* inpu
if (&local_state._probe_block != input_block) {
input_block->swap(local_state._probe_block);
+ COUNTER_SET(local_state._memory_used_counter,
+ (int64_t)local_state._probe_block.allocated_bytes());
+ COUNTER_SET(local_state._peak_memory_usage_counter,
+ local_state._memory_used_counter->value());
}
}
return Status::OK();
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe.h
b/be/src/pipeline/exec/join/process_hash_table_probe.h
index 2ccc9aec8c7..7a5c34fb845 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe.h
@@ -131,7 +131,6 @@ struct ProcessHashTableProbe {
bool _need_calculate_build_index_has_zero = true;
bool* _has_null_in_build_side;
- RuntimeProfile::Counter* _rows_returned_counter = nullptr;
RuntimeProfile::Counter* _search_hashtable_timer = nullptr;
RuntimeProfile::Counter* _init_probe_side_timer = nullptr;
RuntimeProfile::Counter* _build_side_output_timer = nullptr;
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
index 5de033b63e8..8123badb726 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
@@ -51,7 +51,6 @@
ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinProbeLocalState
_left_output_slot_flags(parent->left_output_slot_flags()),
_right_output_slot_flags(parent->right_output_slot_flags()),
_has_null_in_build_side(parent->has_null_in_build_side()),
- _rows_returned_counter(parent->_rows_returned_counter),
_search_hashtable_timer(parent->_search_hashtable_timer),
_init_probe_side_timer(parent->_init_probe_side_timer),
_build_side_output_timer(parent->_build_side_output_timer),
@@ -177,8 +176,10 @@ typename HashTableType::State
ProcessHashTableProbe<JoinOpType>::_init_probe_sid
false,
hash_table_ctx.hash_table->get_bucket_size());
hash_table_ctx.hash_table->pre_build_idxs(hash_table_ctx.bucket_nums,
need_judge_null ? null_map :
nullptr);
- COUNTER_SET(_parent->_probe_arena_memory_usage,
- (int64_t)hash_table_ctx.serialized_keys_size(false));
+ int64_t arena_memory_usage =
hash_table_ctx.serialized_keys_size(false);
+ COUNTER_SET(_parent->_probe_arena_memory_usage, arena_memory_usage);
+ COUNTER_UPDATE(_parent->_memory_used_counter, arena_memory_usage);
+ COUNTER_SET(_parent->_peak_memory_usage_counter,
_parent->_memory_used_counter->value());
}
return typename HashTableType::State(_parent->_probe_columns);
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 304e8e96f0c..c91dccb873f 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -106,7 +106,6 @@ Status
MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
local_state._output_expr_contexts, *output_block, block,
true));
vectorized::materialize_block_inplace(*block);
}
- COUNTER_UPDATE(local_state._rows_returned_counter, block->rows());
return Status::OK();
}
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index 79f3cbdf17b..3b5174d87c0 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -322,17 +322,28 @@ Status
OperatorXBase::get_block_after_projects(RuntimeState* state, vectorized::
}
});
+ Status status;
auto* local_state = state->get_local_state(operator_id());
+ Defer defer([&]() {
+ if (status.ok()) {
+ if (auto rows = block->rows()) {
+ COUNTER_UPDATE(local_state->_rows_returned_counter, rows);
+ COUNTER_UPDATE(local_state->_blocks_returned_counter, 1);
+ }
+ }
+ });
if (_output_row_descriptor) {
local_state->clear_origin_block();
- auto status = get_block(state, &local_state->_origin_block, eos);
+ status = get_block(state, &local_state->_origin_block, eos);
if (UNLIKELY(!status.ok())) {
return status;
}
- return do_projections(state, &local_state->_origin_block, block);
+ status = do_projections(state, &local_state->_origin_block, block);
+ return status;
}
-
local_state->_peak_memory_usage_counter->set(local_state->_mem_tracker->peak_consumption());
- return get_block(state, block, eos);
+ status = get_block(state, block, eos);
+
local_state->_peak_memory_usage_counter->set(local_state->_memory_used_counter->value());
+ return status;
}
void PipelineXLocalStateBase::reached_limit(vectorized::Block* block, bool*
eos) {
@@ -353,8 +364,6 @@ void
PipelineXLocalStateBase::reached_limit(vectorized::Block* block, bool* eos)
if (auto rows = block->rows()) {
_num_rows_returned += rows;
- COUNTER_UPDATE(_blocks_returned_counter, 1);
- COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}
}
@@ -475,10 +484,9 @@ Status
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
_open_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "OpenTime", 1);
_close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1);
_exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1);
- _mem_tracker = std::make_unique<MemTracker>("PipelineXLocalState:" +
_runtime_profile->name());
- _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_runtime_profile,
"MemoryUsage", 1);
- _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter(
- "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1);
+ _memory_used_counter = ADD_COUNTER_WITH_LEVEL(_runtime_profile,
"MemoryUsage", TUnit::BYTES, 1);
+ _peak_memory_usage_counter =
+ _runtime_profile->AddHighWaterMarkCounter("MemoryUsagePeak",
TUnit::BYTES, "", 1);
return Status::OK();
}
@@ -511,11 +519,8 @@ Status
PipelineXLocalState<SharedStateArg>::close(RuntimeState* state) {
if constexpr (!std::is_same_v<SharedStateArg, FakeSharedState>) {
COUNTER_SET(_wait_for_dependency_timer,
_dependency->watcher_elapse_time());
}
- if (_rows_returned_counter != nullptr) {
- COUNTER_SET(_rows_returned_counter, _num_rows_returned);
- }
if (_peak_memory_usage_counter) {
- _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
+ _peak_memory_usage_counter->set(_memory_used_counter->value());
}
_closed = true;
// Some kinds of source operators has a 1-1 relationship with a sink
operator (such as AnalyticOperator).
@@ -555,10 +560,9 @@ Status
PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
_close_timer = ADD_TIMER_WITH_LEVEL(_profile, "CloseTime", 1);
_exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1);
info.parent_profile->add_child(_profile, true, nullptr);
- _mem_tracker = std::make_unique<MemTracker>(_parent->get_name());
- _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_profile,
"MemoryUsage", 1);
+ _memory_used_counter = ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsage",
TUnit::BYTES, 1);
_peak_memory_usage_counter =
- _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES,
"MemoryUsage", 1);
+ _profile->AddHighWaterMarkCounter("MemoryUsagePeak", TUnit::BYTES,
"", 1);
return Status::OK();
}
@@ -571,7 +575,7 @@ Status
PipelineXSinkLocalState<SharedState>::close(RuntimeState* state, Status e
COUNTER_SET(_wait_for_dependency_timer,
_dependency->watcher_elapse_time());
}
if (_peak_memory_usage_counter) {
- _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
+ _peak_memory_usage_counter->set(_memory_used_counter->value());
}
_closed = true;
return Status::OK();
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 301f7599737..25c3a68a9f2 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -163,10 +163,9 @@ public:
void reached_limit(vectorized::Block* block, bool* eos);
RuntimeProfile* profile() { return _runtime_profile.get(); }
- MemTracker* mem_tracker() { return _mem_tracker.get(); }
- RuntimeProfile::Counter* rows_returned_counter() { return
_rows_returned_counter; }
- RuntimeProfile::Counter* blocks_returned_counter() { return
_blocks_returned_counter; }
RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; }
+ RuntimeProfile::Counter* memory_used_counter() { return
_memory_used_counter; }
+ RuntimeProfile::Counter* peak_memory_usage_counter() { return
_peak_memory_usage_counter; }
OperatorXBase* parent() { return _parent; }
RuntimeState* state() { return _state; }
vectorized::VExprContextSPtrs& conjuncts() { return _conjuncts; }
@@ -188,16 +187,14 @@ public:
protected:
friend class OperatorXBase;
+ template <typename LocalStateType>
+ friend class ScanOperatorX;
ObjectPool* _pool = nullptr;
int64_t _num_rows_returned {0};
std::unique_ptr<RuntimeProfile> _runtime_profile;
- // Record this node memory size. it is expected that artificial guarantees
are accurate,
- // which will providea reference for operator memory.
- std::unique_ptr<MemTracker> _mem_tracker;
-
std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
RuntimeProfile::Counter* _rows_returned_counter = nullptr;
@@ -342,7 +339,6 @@ public:
DataSinkOperatorXBase* parent() { return _parent; }
RuntimeState* state() { return _state; }
RuntimeProfile* profile() { return _profile; }
- MemTracker* mem_tracker() { return _mem_tracker.get(); }
[[nodiscard]] RuntimeProfile* faker_runtime_profile() const {
return _faker_runtime_profile.get();
}
@@ -350,9 +346,7 @@ public:
RuntimeProfile::Counter* rows_input_counter() { return
_rows_input_counter; }
RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; }
RuntimeProfile::Counter* memory_used_counter() { return
_memory_used_counter; }
- RuntimeProfile::HighWaterMarkCounter* peak_memory_usage_counter() {
- return _peak_memory_usage_counter;
- }
+ RuntimeProfile::Counter* peak_memory_usage_counter() { return
_peak_memory_usage_counter; }
virtual std::vector<Dependency*> dependencies() const { return {nullptr}; }
// override in exchange sink , AsyncWriterSink
@@ -364,7 +358,6 @@ protected:
DataSinkOperatorXBase* _parent = nullptr;
RuntimeState* _state = nullptr;
RuntimeProfile* _profile = nullptr;
- std::unique_ptr<MemTracker> _mem_tracker;
// Set to true after close() has been called. subclasses should check and
set this in
// close().
bool _closed = false;
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index fbabdbdc8f8..eba9e8472d3 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -113,9 +113,9 @@ Status PartitionSortSinkLocalState::init(RuntimeState*
state, LocalSinkStateInfo
_partition_exprs_num = p._partition_exprs_num;
_hash_table_size_counter = ADD_COUNTER(_profile, "HashTableSize",
TUnit::UNIT);
_serialize_key_arena_memory_usage =
- _profile->AddHighWaterMarkCounter("SerializeKeyArena",
TUnit::BYTES, "MemoryUsage", 1);
+ _profile->AddHighWaterMarkCounter("MemoryUsageSerializeKeyArena",
TUnit::BYTES, "", 1);
_hash_table_memory_usage =
- ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "HashTable", TUnit::BYTES,
"MemoryUsage", 1);
+ ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageHashTable",
TUnit::BYTES, 1);
_build_timer = ADD_TIMER(_profile, "HashTableBuildTime");
_selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime");
_emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime");
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp
b/be/src/pipeline/exec/partition_sort_source_operator.cpp
index f2cd8dea0b9..6d355477ab8 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp
@@ -58,7 +58,6 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState*
state, vectorized::
}
}
if (!output_block->empty()) {
- COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
local_state._num_rows_returned += output_block->rows();
}
return Status::OK();
@@ -80,7 +79,6 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState*
state, vectorized::
local_state._sort_idx >=
local_state._shared_state->partition_sorts.size();
}
if (!output_block->empty()) {
- COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
local_state._num_rows_returned += output_block->rows();
}
return Status::OK();
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 469716b7a22..ab0a43f4a63 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -81,10 +81,10 @@ Status PartitionedAggSinkLocalState::close(RuntimeState*
state, Status exec_stat
void PartitionedAggSinkLocalState::_init_counters() {
_internal_runtime_profile =
std::make_unique<RuntimeProfile>("internal_profile");
- _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(),
"HashTable",
- TUnit::BYTES,
"MemoryUsage", 1);
+ _hash_table_memory_usage =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageHashTable",
TUnit::BYTES, 1);
_serialize_key_arena_memory_usage =
Base::profile()->AddHighWaterMarkCounter(
- "SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1);
+ "MemoryUsageSerializeKeyArena", TUnit::BYTES, "", 1);
_build_timer = ADD_TIMER(Base::profile(), "BuildTime");
_serialize_key_timer = ADD_TIMER(Base::profile(), "SerializeKeyTime");
@@ -110,8 +110,8 @@ void PartitionedAggSinkLocalState::_init_counters() {
} while (false)
void PartitionedAggSinkLocalState::update_profile(RuntimeProfile*
child_profile) {
- UPDATE_PROFILE(_hash_table_memory_usage, "HashTable");
- UPDATE_PROFILE(_serialize_key_arena_memory_usage, "SerializeKeyArena");
+ UPDATE_PROFILE(_hash_table_memory_usage, "MemoryUsageHashTable");
+ UPDATE_PROFILE(_serialize_key_arena_memory_usage,
"MemoryUsageSerializeKeyArena");
UPDATE_PROFILE(_build_timer, "BuildTime");
UPDATE_PROFILE(_serialize_key_timer, "SerializeKeyTime");
UPDATE_PROFILE(_merge_timer, "MergeTime");
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 018d63a6dee..0e56acc1c57 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -557,8 +557,7 @@ Status
PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
}
{
SCOPED_TIMER(local_state._partition_timer);
- RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state,
input_block,
-
local_state._mem_tracker.get()));
+ RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state,
input_block));
}
std::vector<std::vector<uint32_t>> partition_indexes(_partition_count);
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 32af13ba548..e3ca74c2d53 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -80,7 +80,7 @@ size_t
PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state
if (inner_sink_state_) {
auto inner_sink_state =
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
- return inner_sink_state->_build_side_mem_used;
+ return inner_sink_state->_build_blocks_memory_usage->value();
}
}
return 0;
@@ -161,7 +161,7 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
{
SCOPED_TIMER(_partition_timer);
- (void)_partitioner->do_partitioning(state, &sub_block,
_mem_tracker.get());
+ (void)_partitioner->do_partitioning(state, &sub_block);
}
const auto* channel_ids =
_partitioner->get_channel_ids().get<uint32_t>();
@@ -334,7 +334,7 @@ Status
PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,
{
/// TODO: DO NOT execute build exprs twice(when partition and building
hash table)
SCOPED_TIMER(_partition_timer);
- RETURN_IF_ERROR(_partitioner->do_partitioning(state, in_block,
_mem_tracker.get()));
+ RETURN_IF_ERROR(_partitioner->do_partitioning(state, in_block));
}
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
diff --git a/be/src/pipeline/exec/repeat_operator.cpp
b/be/src/pipeline/exec/repeat_operator.cpp
index cd707ccc49f..5c94d43f0d1 100644
--- a/be/src/pipeline/exec/repeat_operator.cpp
+++ b/be/src/pipeline/exec/repeat_operator.cpp
@@ -248,7 +248,6 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state,
vectorized::Block* outp
}
*eos = _child_eos && _child_block.rows() == 0;
local_state.reached_limit(output_block, eos);
- COUNTER_SET(local_state._rows_returned_counter,
local_state._num_rows_returned);
return Status::OK();
}
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 7c774a5aaa0..5d41c800383 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -358,7 +358,15 @@ public:
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos)
override;
Status get_block_after_projects(RuntimeState* state, vectorized::Block*
block,
bool* eos) override {
- return get_block(state, block, eos);
+ Status status = get_block(state, block, eos);
+ if (status.ok()) {
+ if (auto rows = block->rows()) {
+ auto* local_state = state->get_local_state(operator_id());
+ COUNTER_UPDATE(local_state->_rows_returned_counter, rows);
+ COUNTER_UPDATE(local_state->_blocks_returned_counter, 1);
+ }
+ }
+ return status;
}
[[nodiscard]] bool is_source() const override { return true; }
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp
b/be/src/pipeline/exec/sort_sink_operator.cpp
index 6f67262ef1f..ad7d7297109 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -31,7 +31,7 @@ Status SortSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_sort_blocks_memory_usage =
- ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "SortBlocks", TUnit::BYTES,
"MemoryUsage", 1);
+ ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageSortBlocks",
TUnit::BYTES, 1);
_append_blocks_timer = ADD_TIMER(profile(), "AppendBlockTime");
_update_runtime_predicate_timer = ADD_TIMER(profile(),
"UpdateRuntimePredicateTime");
return Status::OK();
@@ -121,12 +121,14 @@ Status SortSinkOperatorX::sink(doris::RuntimeState*
state, vectorized::Block* in
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
if (in_block->rows() > 0) {
- COUNTER_UPDATE(local_state._sort_blocks_memory_usage,
(int64_t)in_block->bytes());
{
SCOPED_TIMER(local_state._append_blocks_timer);
RETURN_IF_ERROR(local_state._shared_state->sorter->append_block(in_block));
}
-
local_state._mem_tracker->set_consumption(local_state._shared_state->sorter->data_size());
+ int64_t data_size = local_state._shared_state->sorter->data_size();
+ COUNTER_SET(local_state._sort_blocks_memory_usage, data_size);
+ COUNTER_SET(local_state._memory_used_counter, data_size);
+ COUNTER_SET(local_state._peak_memory_usage_counter, data_size);
RETURN_IF_CANCELLED(state);
if (state->get_query_ctx()->has_runtime_predicate(_node_id)) {
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 4bf1ab04efb..130affd2f4b 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -51,7 +51,7 @@ void SpillSortSinkLocalState::_init_counters() {
_partial_sort_timer = ADD_TIMER(_profile, "PartialSortTime");
_merge_block_timer = ADD_TIMER(_profile, "MergeBlockTime");
_sort_blocks_memory_usage =
- ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "SortBlocks", TUnit::BYTES,
"MemoryUsage", 1);
+ ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageSortBlocks",
TUnit::BYTES, 1);
_spill_merge_sort_timer =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "SpillMergeSortTime",
"Spill", 1);
@@ -70,7 +70,7 @@ void SpillSortSinkLocalState::_init_counters() {
void SpillSortSinkLocalState::update_profile(RuntimeProfile* child_profile) {
UPDATE_PROFILE(_partial_sort_timer, "PartialSortTime");
UPDATE_PROFILE(_merge_block_timer, "MergeBlockTime");
- UPDATE_PROFILE(_sort_blocks_memory_usage, "SortBlocks");
+ UPDATE_PROFILE(_sort_blocks_memory_usage, "MemoryUsageSortBlocks");
}
Status SpillSortSinkLocalState::close(RuntimeState* state, Status
execsink_status) {
@@ -156,8 +156,11 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState*
state, vectorized::Bloc
DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::sink",
{ return Status::InternalError("fault_inject
spill_sort_sink sink failed"); });
RETURN_IF_ERROR(_sort_sink_operator->sink(local_state._runtime_state.get(),
in_block, false));
- local_state._mem_tracker->set_consumption(
-
local_state._shared_state->in_mem_shared_state->sorter->data_size());
+ int64_t data_size =
local_state._shared_state->in_mem_shared_state->sorter->data_size();
+ COUNTER_SET(local_state._sort_blocks_memory_usage, data_size);
+ COUNTER_SET(local_state._memory_used_counter, data_size);
+ COUNTER_SET(local_state._peak_memory_usage_counter, data_size);
+
if (eos) {
if (local_state._shared_state->is_spilled) {
if (revocable_mem_size(state) > 0) {
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 9ead9c37b17..ab969a129e6 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -87,10 +87,10 @@ Status StreamingAggLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(Base::exec_time_counter());
SCOPED_TIMER(Base::_init_timer);
- _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(),
"HashTable",
- TUnit::BYTES,
"MemoryUsage", 1);
+ _hash_table_memory_usage =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageHashTable",
TUnit::BYTES, 1);
_serialize_key_arena_memory_usage =
Base::profile()->AddHighWaterMarkCounter(
- "SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1);
+ "MemoryUsageSerializeKeyArena", TUnit::BYTES, "", 1);
_build_timer = ADD_TIMER(Base::profile(), "BuildTime");
_merge_timer = ADD_TIMER(Base::profile(), "MergeTime");
@@ -350,10 +350,10 @@ Status
StreamingAggLocalState::_merge_without_key(vectorized::Block* block) {
}
void StreamingAggLocalState::_update_memusage_without_key() {
- auto arena_memory_usage = _agg_arena_pool->size() -
_mem_usage_record.used_in_arena;
- Base::_mem_tracker->consume(arena_memory_usage);
- _serialize_key_arena_memory_usage->add(arena_memory_usage);
- _mem_usage_record.used_in_arena = _agg_arena_pool->size();
+ int64_t arena_memory_usage = _agg_arena_pool->size();
+ COUNTER_SET(_memory_used_counter, arena_memory_usage);
+ COUNTER_SET(_peak_memory_usage_counter, arena_memory_usage);
+ COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
}
Status StreamingAggLocalState::_execute_with_serialized_key(vectorized::Block*
block) {
@@ -365,28 +365,25 @@ Status
StreamingAggLocalState::_execute_with_serialized_key(vectorized::Block* b
}
void StreamingAggLocalState::_update_memusage_with_serialized_key() {
- std::visit(
- vectorized::Overload {
- [&](std::monostate& arg) -> void {
- throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"uninited hash table");
- },
- [&](auto& agg_method) -> void {
- auto& data = *agg_method.hash_table;
- auto arena_memory_usage = _agg_arena_pool->size() +
-
_aggregate_data_container->memory_usage() -
-
_mem_usage_record.used_in_arena;
- Base::_mem_tracker->consume(arena_memory_usage);
-
Base::_mem_tracker->consume(data.get_buffer_size_in_bytes() -
-
_mem_usage_record.used_in_state);
-
_serialize_key_arena_memory_usage->add(arena_memory_usage);
- COUNTER_UPDATE(
- _hash_table_memory_usage,
- data.get_buffer_size_in_bytes() -
_mem_usage_record.used_in_state);
- _mem_usage_record.used_in_state =
data.get_buffer_size_in_bytes();
- _mem_usage_record.used_in_arena =
- _agg_arena_pool->size() +
_aggregate_data_container->memory_usage();
- }},
- _agg_data->method_variant);
+ std::visit(vectorized::Overload {
+ [&](std::monostate& arg) -> void {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"uninited hash table");
+ },
+ [&](auto& agg_method) -> void {
+ auto& data = *agg_method.hash_table;
+ int64_t arena_memory_usage =
_agg_arena_pool->size() +
+
_aggregate_data_container->memory_usage();
+ int64_t hash_table_memory_usage =
data.get_buffer_size_in_bytes();
+
+ COUNTER_SET(_memory_used_counter,
+ arena_memory_usage +
hash_table_memory_usage);
+ COUNTER_SET(_peak_memory_usage_counter,
+ arena_memory_usage +
hash_table_memory_usage);
+
+ COUNTER_SET(_serialize_key_arena_memory_usage,
arena_memory_usage);
+ COUNTER_SET(_hash_table_memory_usage,
hash_table_memory_usage);
+ }},
+ _agg_data->method_variant);
}
template <bool limit>
@@ -508,7 +505,6 @@ Status
StreamingAggLocalState::do_pre_agg(vectorized::Block* input_block,
// pre stream agg need use _num_row_return to decide whether to do pre
stream agg
_cur_num_rows_returned += output_block->rows();
_make_nullable_output_key(output_block);
- // COUNTER_SET(_rows_returned_counter, _num_rows_returned);
_executor->update_memusage(this);
return Status::OK();
}
@@ -1247,7 +1243,6 @@ Status StreamingAggLocalState::close(RuntimeState* state)
{
std::vector<char> tmp_deserialize_buffer;
_deserialize_buffer.swap(tmp_deserialize_buffer);
- Base::_mem_tracker->release(_mem_usage_record.used_in_state +
_mem_usage_record.used_in_arena);
/// _hash_table_size_counter may be null if prepare failed.
if (_hash_table_size_counter) {
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index 59d5491d10c..b695880ac28 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -167,12 +167,6 @@ private:
};
std::unique_ptr<ExecutorBase> _executor = nullptr;
- struct MemoryRecord {
- MemoryRecord() : used_in_arena(0), used_in_state(0) {}
- int64_t used_in_arena;
- int64_t used_in_state;
- };
- MemoryRecord _mem_usage_record;
std::unique_ptr<vectorized::Block> _child_block = nullptr;
bool _child_eos = false;
std::unique_ptr<vectorized::Block> _pre_aggregated_block = nullptr;
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
index 2d20b8f365c..c4832b9958c 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
@@ -26,7 +26,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState*
state, LocalStateInfo&
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_channel_id = info.task_idx;
- _shared_state->mem_trackers[_channel_id] = _mem_tracker.get();
+ _shared_state->mem_counters[_channel_id] = _memory_used_counter;
_exchanger = _shared_state->exchanger.get();
DCHECK(_exchanger != nullptr);
_get_block_failed_counter =
@@ -105,8 +105,8 @@ std::string LocalExchangeSourceLocalState::debug_string(int
indentation_level) c
_exchanger->data_queue_debug_string(_channel_id));
size_t i = 0;
fmt::format_to(debug_string_buffer, ", MemTrackers: ");
- for (auto* mem_tracker : _shared_state->mem_trackers) {
- fmt::format_to(debug_string_buffer, "{}: {}, ", i,
mem_tracker->consumption());
+ for (auto* mem_counter : _shared_state->mem_counters) {
+ fmt::format_to(debug_string_buffer, "{}: {}, ", i,
mem_counter->value());
i++;
}
return fmt::to_string(debug_string_buffer);
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index fa34b6a4040..23f91cca631 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -118,8 +118,7 @@ Status ShuffleExchanger::sink(RuntimeState* state,
vectorized::Block* in_block,
}
{
SCOPED_TIMER(local_state._compute_hash_value_timer);
- RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state,
in_block,
-
local_state.mem_tracker()));
+ RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state,
in_block));
}
{
SCOPED_TIMER(local_state._distribute_timer);
diff --git a/be/src/vec/runtime/partitioner.cpp
b/be/src/vec/runtime/partitioner.cpp
index 0d6165b7555..d0c8f2d8fcd 100644
--- a/be/src/vec/runtime/partitioner.cpp
+++ b/be/src/vec/runtime/partitioner.cpp
@@ -25,8 +25,8 @@
namespace doris::vectorized {
template <typename HashValueType, typename ChannelIds>
-Status Partitioner<HashValueType, ChannelIds>::do_partitioning(RuntimeState*
state, Block* block,
- MemTracker*
mem_tracker) const {
+Status Partitioner<HashValueType, ChannelIds>::do_partitioning(RuntimeState*
state,
+ Block* block)
const {
int rows = block->rows();
if (rows > 0) {
@@ -38,10 +38,7 @@ Status Partitioner<HashValueType,
ChannelIds>::do_partitioning(RuntimeState* sta
_hash_vals.resize(rows);
std::fill(_hash_vals.begin(), _hash_vals.end(), 0);
auto* __restrict hashes = _hash_vals.data();
- {
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker);
- RETURN_IF_ERROR(_get_partition_column_result(block, result));
- }
+ RETURN_IF_ERROR(_get_partition_column_result(block, result));
for (int j = 0; j < result_size; ++j) {
_do_hash(unpack_if_const(block->get_by_position(result[j]).column).first,
hashes, j);
}
@@ -50,10 +47,7 @@ Status Partitioner<HashValueType,
ChannelIds>::do_partitioning(RuntimeState* sta
hashes[i] = ChannelIds()(hashes[i], _partition_count);
}
- {
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker);
- Block::erase_useless_column(block, column_to_keep);
- }
+ Block::erase_useless_column(block, column_to_keep);
}
return Status::OK();
}
diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h
index 3152edb5cb5..5607a83327b 100644
--- a/be/src/vec/runtime/partitioner.h
+++ b/be/src/vec/runtime/partitioner.h
@@ -48,8 +48,7 @@ public:
virtual Status open(RuntimeState* state) = 0;
- virtual Status do_partitioning(RuntimeState* state, Block* block,
- MemTracker* mem_tracker) const = 0;
+ virtual Status do_partitioning(RuntimeState* state, Block* block) const =
0;
virtual ChannelField get_channel_ids() const = 0;
@@ -75,8 +74,7 @@ public:
Status open(RuntimeState* state) override { return
VExpr::open(_partition_expr_ctxs, state); }
- Status do_partitioning(RuntimeState* state, Block* block,
- MemTracker* mem_tracker) const override;
+ Status do_partitioning(RuntimeState* state, Block* block) const override;
ChannelField get_channel_ids() const override {
return {_hash_vals.data(), sizeof(HashValueType)};
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index c14d119e0fe..78067b9b181 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -61,12 +61,13 @@ inline uint32_t VDataStreamMgr::get_hash_value(const
TUniqueId& fragment_instanc
}
std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::create_recvr(
- RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId&
fragment_instance_id,
- PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile,
bool is_merging) {
+ RuntimeState* state, pipeline::ExchangeLocalState* parent, const
RowDescriptor& row_desc,
+ const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int
num_senders,
+ RuntimeProfile* profile, bool is_merging) {
DCHECK(profile != nullptr);
VLOG_FILE << "creating receiver for fragment=" <<
print_id(fragment_instance_id)
<< ", node=" << dest_node_id;
- std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(this, state,
row_desc,
+ std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(this, parent,
state, row_desc,
fragment_instance_id, dest_node_id,
num_senders,
is_merging, profile));
uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
diff --git a/be/src/vec/runtime/vdata_stream_mgr.h
b/be/src/vec/runtime/vdata_stream_mgr.h
index 09e347fcfb2..bd5e6f9b91e 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.h
+++ b/be/src/vec/runtime/vdata_stream_mgr.h
@@ -40,6 +40,9 @@ class RuntimeState;
class RowDescriptor;
class RuntimeProfile;
class PTransmitDataParams;
+namespace pipeline {
+class ExchangeLocalState;
+}
namespace vectorized {
class VDataStreamRecvr;
@@ -50,6 +53,7 @@ public:
~VDataStreamMgr();
std::shared_ptr<VDataStreamRecvr> create_recvr(RuntimeState* state,
+
pipeline::ExchangeLocalState* parent,
const RowDescriptor&
row_desc,
const TUniqueId&
fragment_instance_id,
PlanNodeId dest_node_id,
int num_senders,
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 1ca6bb7f2c5..f1dfbbf3047 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -96,6 +96,7 @@ Status
VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block
DCHECK(!_block_queue.empty());
auto [next_block, block_byte_size] = std::move(_block_queue.front());
_block_queue.pop_front();
+ _recvr->_parent->memory_used_counter()->update(-(int64_t)block_byte_size);
sub_blocks_memory_usage(block_byte_size);
_record_debug_info();
if (_block_queue.empty() && _source_dependency) {
@@ -207,6 +208,9 @@ Status VDataStreamRecvr::SenderQueue::add_block(const
PBlock& pblock, int be_num
_pending_closures.emplace_back(*done, monotonicStopWatch);
*done = nullptr;
}
+ _recvr->_parent->memory_used_counter()->update(block_byte_size);
+ _recvr->_parent->peak_memory_usage_counter()->set(
+ _recvr->_parent->memory_used_counter()->value());
add_blocks_memory_usage(block_byte_size);
return Status::OK();
}
@@ -245,6 +249,9 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block,
bool use_move) {
_record_debug_info();
try_set_dep_ready_without_lock();
COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
+ _recvr->_parent->memory_used_counter()->update(block_mem_size);
+ _recvr->_parent->peak_memory_usage_counter()->set(
+ _recvr->_parent->memory_used_counter()->value());
add_blocks_memory_usage(block_mem_size);
}
}
@@ -315,12 +322,13 @@ void VDataStreamRecvr::SenderQueue::close() {
_block_queue.clear();
}
-VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState*
state,
- const RowDescriptor& row_desc,
+VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr,
pipeline::ExchangeLocalState* parent,
+ RuntimeState* state, const RowDescriptor&
row_desc,
const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id,
int num_senders, bool is_merging,
RuntimeProfile* profile)
: HasTaskExecutionCtx(state),
_mgr(stream_mgr),
+ _parent(parent),
_query_thread_context(state->query_id(), state->query_mem_tracker(),
state->get_query_ctx()->workload_group()),
_fragment_instance_id(fragment_instance_id),
@@ -352,9 +360,6 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr*
stream_mgr, RuntimeState* sta
}
// Initialize the counters
- _memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage");
- _peak_memory_usage_counter =
- _profile->add_counter("PeakMemoryUsage", TUnit::BYTES,
"MemoryUsage");
_remote_bytes_received_counter = ADD_COUNTER(_profile,
"RemoteBytesReceived", TUnit::BYTES);
_local_bytes_received_counter = ADD_COUNTER(_profile,
"LocalBytesReceived", TUnit::BYTES);
@@ -417,7 +422,6 @@ std::shared_ptr<pipeline::Dependency>
VDataStreamRecvr::get_local_channel_depend
}
Status VDataStreamRecvr::get_next(Block* block, bool* eos) {
- _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
if (!_is_merging) {
block->clear();
return _sender_queues[0]->get_batch(block, eos);
@@ -492,9 +496,6 @@ void VDataStreamRecvr::close() {
_mgr = nullptr;
_merger.reset();
- if (_peak_memory_usage_counter) {
- _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
- }
}
void VDataStreamRecvr::set_sink_dep_always_ready() const {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index e8dcfdedba5..b2d76590ba2 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -69,7 +69,8 @@ class VDataStreamRecvr;
class VDataStreamRecvr : public HasTaskExecutionCtx {
public:
class SenderQueue;
- VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* state, const
RowDescriptor& row_desc,
+ VDataStreamRecvr(VDataStreamMgr* stream_mgr, pipeline::ExchangeLocalState*
parent,
+ RuntimeState* state, const RowDescriptor& row_desc,
const TUniqueId& fragment_instance_id, PlanNodeId
dest_node_id,
int num_senders, bool is_merging, RuntimeProfile*
profile);
@@ -120,6 +121,8 @@ private:
// DataStreamMgr instance used to create this recvr. (Not owned)
VDataStreamMgr* _mgr = nullptr;
+ pipeline::ExchangeLocalState* _parent = nullptr;
+
QueryThreadContext _query_thread_context;
// Fragment and node id of the destination exchange node this receiver is
used by.
@@ -152,8 +155,6 @@ private:
RuntimeProfile::Counter* _data_arrival_timer = nullptr;
RuntimeProfile::Counter* _decompress_timer = nullptr;
RuntimeProfile::Counter* _decompress_bytes = nullptr;
- RuntimeProfile::Counter* _memory_usage_counter = nullptr;
- RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
// Number of rows received
RuntimeProfile::Counter* _rows_produced_counter = nullptr;
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index a458d7f5ef1..e169d57e05e 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -253,12 +253,10 @@ Status BlockSerializer::next_serialized_block(Block*
block, PBlock* dest, int nu
bool* serialized, bool eos,
const std::vector<uint32_t>*
rows) {
if (_mutable_block == nullptr) {
- SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
_mutable_block = MutableBlock::create_unique(block->clone_empty());
}
{
- SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
if (rows) {
if (!rows->empty()) {
const auto* begin = rows->data();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]