This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 10483ea12c7 [fix](profile) fix error set with peak_memory_usage in
pipeline #27749
10483ea12c7 is described below
commit 10483ea12c7099a2dfcda46a428e5c52b33840ac
Author: Mryange <[email protected]>
AuthorDate: Sat Dec 2 14:12:38 2023 +0800
[fix](profile) fix error set with peak_memory_usage in pipeline #27749
---
be/src/exec/exec_node.cpp | 6 +++---
be/src/pipeline/exec/aggregation_sink_operator.cpp | 12 ++++++------
be/src/pipeline/exec/exchange_sink_operator.cpp | 5 +----
be/src/pipeline/exec/exchange_sink_operator.h | 1 -
be/src/pipeline/pipeline_x/dependency.h | 6 ------
be/src/pipeline/pipeline_x/operator.cpp | 9 +++++++++
be/src/pipeline/pipeline_x/operator.h | 2 ++
be/src/vec/runtime/vdata_stream_recvr.cpp | 3 +++
8 files changed, 24 insertions(+), 20 deletions(-)
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 354f80d3c10..4681218dcae 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -200,6 +200,9 @@ void ExecNode::release_resource(doris::RuntimeState* state)
{
_is_resource_released = true;
}
+ if (_peak_memory_usage_counter) {
+ _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
+ }
}
Status ExecNode::close(RuntimeState* state) {
@@ -218,9 +221,6 @@ Status ExecNode::close(RuntimeState* state) {
result = st;
}
}
- if (_peak_memory_usage_counter) {
- _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
- }
release_resource(state);
LOG(INFO) << "query= " << print_id(state->query_id())
<< ", fragment_instance_id=" <<
print_id(state->fragment_instance_id())
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index c2bb041abb2..49cbe30a3b2 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -243,10 +243,9 @@ void AggSinkLocalState<DependencyType,
Derived>::_update_memusage_with_serialize
_agg_arena_pool->size() +
Base::_shared_state->aggregate_data_container->memory_usage() -
Base::_shared_state->mem_usage_record.used_in_arena;
- Base::_shared_state->mem_tracker->consume(arena_memory_usage);
- Base::_shared_state->mem_tracker->consume(
- data.get_buffer_size_in_bytes() -
- Base::_shared_state->mem_usage_record.used_in_state);
+ Base::_mem_tracker->consume(arena_memory_usage);
+ Base::_mem_tracker->consume(data.get_buffer_size_in_bytes() -
+
Base::_shared_state->mem_usage_record.used_in_state);
_serialize_key_arena_memory_usage->add(arena_memory_usage);
COUNTER_UPDATE(_hash_table_memory_usage,
data.get_buffer_size_in_bytes() -
@@ -438,7 +437,7 @@ template <typename DependencyType, typename Derived>
void AggSinkLocalState<DependencyType,
Derived>::_update_memusage_without_key() {
auto arena_memory_usage =
_agg_arena_pool->size() -
Base::_shared_state->mem_usage_record.used_in_arena;
- Base::_shared_state->mem_tracker->consume(arena_memory_usage);
+ Base::_mem_tracker->consume(arena_memory_usage);
_serialize_key_arena_memory_usage->add(arena_memory_usage);
Base::_shared_state->mem_usage_record.used_in_arena =
_agg_arena_pool->size();
}
@@ -877,7 +876,8 @@ Status AggSinkLocalState<DependencyType,
Derived>::close(RuntimeState* state, St
std::vector<char> tmp_deserialize_buffer;
_deserialize_buffer.swap(tmp_deserialize_buffer);
-
+
Base::_mem_tracker->release(Base::_shared_state->mem_usage_record.used_in_state
+
+
Base::_shared_state->mem_usage_record.used_in_arena);
return Base::close(state, exec_status);
}
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 1c66ec02207..20d612d5785 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -123,9 +123,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
_merge_block_timer = ADD_TIMER(profile(), "MergeBlockTime");
_local_bytes_send_counter = ADD_COUNTER(_profile, "LocalBytesSent",
TUnit::BYTES);
_memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage");
- _peak_memory_usage_counter =
- _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES,
"MemoryUsage");
-
static const std::string timer_name = "WaitForDependencyTime";
_wait_for_dependency_timer = ADD_TIMER(_profile, timer_name);
_wait_queue_timer = ADD_CHILD_TIMER(_profile, "WaitForRpcBufferQueue",
timer_name);
@@ -312,7 +309,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows());
SCOPED_TIMER(local_state.exec_time_counter());
-
local_state._peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
+
local_state._peak_memory_usage_counter->set(local_state._mem_tracker->peak_consumption());
bool all_receiver_eof = true;
for (auto channel : local_state.channels) {
if (!channel->is_receiver_eof()) {
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 34502928880..048c8d3910c 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -219,7 +219,6 @@ private:
RuntimeProfile::Counter* _local_bytes_send_counter = nullptr;
RuntimeProfile::Counter* _merge_block_timer = nullptr;
RuntimeProfile::Counter* _memory_usage_counter = nullptr;
- RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
RuntimeProfile::Counter* _wait_queue_timer = nullptr;
RuntimeProfile::Counter* _wait_broadcast_buffer_timer = nullptr;
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 9fbb25aaa2f..49dc327c796 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -353,13 +353,9 @@ public:
int64_t used_in_state;
};
MemoryRecord mem_usage_record;
- std::unique_ptr<MemTracker> mem_tracker =
std::make_unique<MemTracker>("AggregateOperator");
bool agg_data_created_without_key = false;
private:
- void _release_tracker() {
- mem_tracker->release(mem_usage_record.used_in_state +
mem_usage_record.used_in_arena);
- }
void _close_with_serialized_key() {
std::visit(
[&](auto&& agg_method) -> void {
@@ -379,7 +375,6 @@ private:
}
},
agg_data->method_variant);
- _release_tracker();
}
void _close_without_key() {
//because prepare maybe failed, and couldn't create agg data.
@@ -389,7 +384,6 @@ private:
static_cast<void>(_destroy_agg_status(agg_data->without_key));
agg_data_created_without_key = false;
}
- _release_tracker();
}
Status _destroy_agg_status(vectorized::AggregateDataPtr data) {
for (int i = 0; i < aggregate_evaluators.size(); ++i) {
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index 375545448fb..04f7cea0314 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -391,6 +391,9 @@ Status
PipelineXLocalState<DependencyType>::close(RuntimeState* state) {
if (_rows_returned_counter != nullptr) {
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}
+ if (_peak_memory_usage_counter) {
+ _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
+ }
_closed = true;
return Status::OK();
}
@@ -427,6 +430,9 @@ Status
PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
_exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1);
info.parent_profile->add_child(_profile, true, nullptr);
_mem_tracker = std::make_unique<MemTracker>(_parent->get_name());
+ _memory_used_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage");
+ _peak_memory_usage_counter =
+ _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES,
"MemoryUsage");
return Status::OK();
}
@@ -442,6 +448,9 @@ Status
PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, Statu
COUNTER_SET(_wait_for_dependency_timer,
_dependency->watcher_elapse_time());
}
COUNTER_SET(_wait_for_finish_dependency_timer,
_finish_dependency->watcher_elapse_time());
+ if (_peak_memory_usage_counter) {
+ _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
+ }
_closed = true;
return Status::OK();
}
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index 294eb962ee1..58c18db7038 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -396,6 +396,8 @@ protected:
RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr;
RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
RuntimeProfile::Counter* _exec_timer = nullptr;
+ RuntimeProfile::Counter* _memory_used_counter = nullptr;
+ RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
std::shared_ptr<Dependency> _finish_dependency;
};
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index d5d460e80b9..ca0da254e98 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -32,6 +32,7 @@
#include "runtime/memory/mem_tracker.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
+#include "util/defer_op.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
#include "vec/core/materialize_block.h"
@@ -432,6 +433,7 @@ Status VDataStreamRecvr::add_block(const PBlock& pblock,
int sender_id, int be_n
}
void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) {
+ _mem_tracker->consume(block->allocated_bytes());
int use_sender_id = _is_merging ? sender_id : 0;
_sender_queues[use_sender_id]->add_block(block, use_move);
}
@@ -458,6 +460,7 @@ bool VDataStreamRecvr::ready_to_read() {
Status VDataStreamRecvr::get_next(Block* block, bool* eos) {
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
+ Defer release_mem([&]() { _mem_tracker->release(block->allocated_bytes());
});
if (!_is_merging) {
block->clear();
return _sender_queues[0]->get_batch(block, eos);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]