This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b5ee4a9dbb9 [enhancement](profilev2) add some fields for profile v2
(#25611)
b5ee4a9dbb9 is described below
commit b5ee4a9dbb9faeb80984ed55b558226360825137
Author: Jerry Hu <[email protected]>
AuthorDate: Mon Oct 23 15:55:40 2023 +0800
[enhancement](profilev2) add some fields for profile v2 (#25611)
Add 3 counters for ExecNode:
ExecTime - Total execution time(excluding the execution time of children).
OutputBytes - The total number of bytes output to parent.
BlockCount - The total count of blocks output to parent.
---
be/src/exec/exec_node.cpp | 12 ++++++++++++
be/src/exec/exec_node.h | 3 +++
be/src/vec/exec/distinct_vaggregation_node.cpp | 2 ++
be/src/vec/exec/join/vhash_join_node.cpp | 4 ++++
be/src/vec/exec/join/vnested_loop_join_node.cpp | 4 ++++
be/src/vec/exec/scan/vscan_node.cpp | 3 +++
be/src/vec/exec/vaggregation_node.cpp | 7 +++++--
be/src/vec/exec/vaggregation_node.h | 1 -
be/src/vec/exec/vanalytic_eval_node.cpp | 5 +++++
be/src/vec/exec/vexchange_node.cpp | 3 +++
be/src/vec/exec/vpartition_sort_node.cpp | 4 ++++
be/src/vec/exec/vrepeat_node.cpp | 4 ++++
be/src/vec/exec/vselect_node.cpp | 1 +
be/src/vec/exec/vset_operation_node.cpp | 5 +++++
be/src/vec/exec/vsort_node.cpp | 4 ++++
be/src/vec/exec/vtable_function_node.cpp | 1 +
be/src/vec/exec/vtable_function_node.h | 3 +++
be/src/vec/exec/vunion_node.cpp | 5 +++++
18 files changed, 68 insertions(+), 3 deletions(-)
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 352e111f1cb..4c6a734347e 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -129,8 +129,13 @@ Status ExecNode::prepare(RuntimeState* state) {
DCHECK(_runtime_profile.get() != nullptr);
_span = state->get_tracer()->StartSpan(get_name());
OpentelemetryScope scope {_span};
+
+ _exec_timer = ADD_TIMER_WITH_LEVEL(runtime_profile(), "ExecTime", 1);
_rows_returned_counter =
ADD_COUNTER_WITH_LEVEL(_runtime_profile, "RowsReturned",
TUnit::UNIT, 1);
+ _output_bytes_counter =
+ ADD_COUNTER_WITH_LEVEL(_runtime_profile, "OutputBytes",
TUnit::BYTES, 1);
+ _block_count_counter = ADD_COUNTER_WITH_LEVEL(_runtime_profile,
"BlockCount", TUnit::UNIT, 1);
_projection_timer = ADD_TIMER(_runtime_profile, "ProjectionTime");
_rows_returned_rate = runtime_profile()->add_derived_counter(
ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND,
@@ -518,6 +523,7 @@ std::string ExecNode::get_name() {
}
Status ExecNode::do_projections(vectorized::Block* origin_block,
vectorized::Block* output_block) {
+ SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_projection_timer);
using namespace vectorized;
MutableBlock mutable_block =
@@ -551,6 +557,12 @@ Status ExecNode::get_next_after_projects(
RuntimeState* state, vectorized::Block* block, bool* eos,
const std::function<Status(RuntimeState*, vectorized::Block*, bool*)>&
func,
bool clear_data) {
+ Defer defer([block, this]() {
+ if (block && !block->empty()) {
+ COUNTER_UPDATE(_output_bytes_counter, block->allocated_bytes());
+ COUNTER_UPDATE(_block_count_counter, 1);
+ }
+ });
if (_output_row_descriptor) {
if (clear_data) {
clear_origin_block();
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index 0fd691e3d85..8aeecc2b8ca 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -277,7 +277,10 @@ protected:
// which will providea reference for operator memory.
std::unique_ptr<MemTracker> _mem_tracker;
+ RuntimeProfile::Counter* _exec_timer;
RuntimeProfile::Counter* _rows_returned_counter;
+ RuntimeProfile::Counter* _output_bytes_counter;
+ RuntimeProfile::Counter* _block_count_counter;
RuntimeProfile::Counter* _rows_returned_rate;
RuntimeProfile::Counter* _memory_used_counter;
RuntimeProfile::Counter* _projection_timer;
diff --git a/be/src/vec/exec/distinct_vaggregation_node.cpp
b/be/src/vec/exec/distinct_vaggregation_node.cpp
index e2c06cc7234..642ad99bd93 100644
--- a/be/src/vec/exec/distinct_vaggregation_node.cpp
+++ b/be/src/vec/exec/distinct_vaggregation_node.cpp
@@ -35,6 +35,7 @@ DistinctAggregationNode::DistinctAggregationNode(ObjectPool*
pool, const TPlanNo
Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key(
doris::vectorized::Block* in_block, doris::vectorized::Block*
out_block) {
+ SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_build_timer);
DCHECK(!_probe_expr_ctxs.empty());
@@ -82,6 +83,7 @@ Status
DistinctAggregationNode::_distinct_pre_agg_with_serialized_key(
void
DistinctAggregationNode::_emplace_into_hash_table_to_distinct(IColumn::Selector&
distinct_row,
ColumnRawPtrs& key_columns,
const
size_t num_rows) {
+ SCOPED_TIMER(_exec_timer);
std::visit(
[&](auto&& agg_method) -> void {
SCOPED_TIMER(_hash_table_compute_timer);
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index f0427a16bd9..70eb7dd8a94 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -316,6 +316,7 @@ void HashJoinNode::prepare_for_next() {
}
Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block*
output_block, bool* eos) {
+ SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_probe_timer);
if (_short_circuit_for_probe) {
// If we use a short-circuit strategy, should return empty block
directly.
@@ -495,6 +496,7 @@ Status
HashJoinNode::_filter_data_and_build_output(RuntimeState* state,
}
Status HashJoinNode::push(RuntimeState* /*state*/, vectorized::Block*
input_block, bool eos) {
+ SCOPED_TIMER(_exec_timer);
_probe_eos = eos;
if (input_block->rows() > 0) {
COUNTER_UPDATE(_probe_rows_counter, input_block->rows());
@@ -670,6 +672,7 @@ Status HashJoinNode::open(RuntimeState* state) {
}
Status HashJoinNode::alloc_resource(doris::RuntimeState* state) {
+ SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_allocate_resource_timer);
RETURN_IF_ERROR(VJoinNodeBase::alloc_resource(state));
for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
@@ -724,6 +727,7 @@ Status HashJoinNode::_materialize_build_side(RuntimeState*
state) {
}
Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block*
in_block, bool eos) {
+ SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_build_timer);
// make one block for each 4 gigabytes
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index c409939aa82..b4f5814a3af 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -134,6 +134,7 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode,
RuntimeState* state) {
Status VNestedLoopJoinNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(VJoinNodeBase::prepare(state));
+ SCOPED_TIMER(_exec_timer);
_build_get_next_timer = ADD_TIMER(_build_phase_profile,
"BuildGetNextTime");
_build_timer = ADD_TIMER(_build_phase_profile, "BuildTime");
_build_rows_counter = ADD_COUNTER(_build_phase_profile, "BuildRows",
TUnit::UNIT);
@@ -201,6 +202,7 @@ Status
VNestedLoopJoinNode::_materialize_build_side(RuntimeState* state) {
}
Status VNestedLoopJoinNode::sink(doris::RuntimeState* state,
vectorized::Block* block, bool eos) {
+ SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_build_timer);
auto rows = block->rows();
auto mem_usage = block->allocated_bytes();
@@ -230,6 +232,7 @@ Status VNestedLoopJoinNode::sink(doris::RuntimeState*
state, vectorized::Block*
}
Status VNestedLoopJoinNode::push(doris::RuntimeState* state,
vectorized::Block* block, bool eos) {
+ SCOPED_TIMER(_exec_timer);
COUNTER_UPDATE(_probe_rows_counter, block->rows());
_cur_probe_row_visited_flags.resize(block->rows());
std::fill(_cur_probe_row_visited_flags.begin(),
_cur_probe_row_visited_flags.end(), 0);
@@ -662,6 +665,7 @@ void VNestedLoopJoinNode::_release_mem() {
}
Status VNestedLoopJoinNode::pull(RuntimeState* state, vectorized::Block*
block, bool* eos) {
+ SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_probe_timer);
if (_is_output_left_side_only) {
RETURN_IF_ERROR(_build_output_block(&_left_block, block));
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index db4a4320a66..57dc6b66fd0 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -166,6 +166,7 @@ Status VScanNode::prepare(RuntimeState* state) {
}
Status VScanNode::open(RuntimeState* state) {
+ SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_TIMER(_open_timer);
RETURN_IF_CANCELLED(state);
@@ -173,6 +174,7 @@ Status VScanNode::open(RuntimeState* state) {
}
Status VScanNode::alloc_resource(RuntimeState* state) {
+ SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_alloc_resource_timer);
if (_opened) {
return Status::OK();
@@ -220,6 +222,7 @@ Status VScanNode::alloc_resource(RuntimeState* state) {
}
Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block,
bool* eos) {
+ SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_get_next_timer);
SCOPED_TIMER(_runtime_profile->total_time_counter());
// in inverted index apply logic, in order to optimize query performance,
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index 758fa4b7a5b..89a97998b76 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -109,7 +109,6 @@ AggregationNode::AggregationNode(ObjectPool* pool, const
TPlanNode& tnode,
_hash_table_input_counter(nullptr),
_build_timer(nullptr),
_expr_timer(nullptr),
- _exec_timer(nullptr),
_intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
_intermediate_tuple_desc(nullptr),
_output_tuple_id(tnode.agg_node.output_tuple_id),
@@ -257,7 +256,6 @@ Status AggregationNode::prepare_profile(RuntimeState*
state) {
_build_timer = ADD_TIMER_WITH_LEVEL(runtime_profile(), "BuildTime", 1);
_build_table_convert_timer = ADD_TIMER(runtime_profile(),
"BuildConvertToPartitionedTime");
_serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTime");
- _exec_timer = ADD_TIMER_WITH_LEVEL(runtime_profile(), "ExecTime", 1);
_merge_timer = ADD_TIMER(runtime_profile(), "MergeTime");
_expr_timer = ADD_TIMER(runtime_profile(), "ExprTime");
_get_results_timer = ADD_TIMER(runtime_profile(), "GetResultsTime");
@@ -417,11 +415,13 @@ Status AggregationNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(prepare_profile(state));
return Status::OK();
}
Status AggregationNode::alloc_resource(doris::RuntimeState* state) {
+ SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(ExecNode::alloc_resource(state));
RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state));
@@ -472,6 +472,7 @@ Status AggregationNode::open(RuntimeState* state) {
Status AggregationNode::do_pre_agg(vectorized::Block* input_block,
vectorized::Block* output_block) {
+ SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(_executor.pre_agg(input_block, output_block));
// pre stream agg need use _num_row_return to decide whether to do pre
stream agg
@@ -510,6 +511,7 @@ Status AggregationNode::get_next(RuntimeState* state,
Block* block, bool* eos) {
}
Status AggregationNode::pull(doris::RuntimeState* state, vectorized::Block*
block, bool* eos) {
+ SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(_executor.get_result(state, block, eos));
_make_nullable_output_key(block);
// dispose the having clause, should not be execute in prestreaming agg
@@ -520,6 +522,7 @@ Status AggregationNode::pull(doris::RuntimeState* state,
vectorized::Block* bloc
}
Status AggregationNode::sink(doris::RuntimeState* state, vectorized::Block*
in_block, bool eos) {
+ SCOPED_TIMER(_exec_timer);
if (in_block->rows() > 0) {
RETURN_IF_ERROR(_executor.execute(in_block));
RETURN_IF_ERROR(_try_spill_disk());
diff --git a/be/src/vec/exec/vaggregation_node.h
b/be/src/vec/exec/vaggregation_node.h
index f8b09bf8d4d..6fab79bb51d 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -425,7 +425,6 @@ protected:
RuntimeProfile::Counter* _hash_table_input_counter;
RuntimeProfile::Counter* _build_timer;
RuntimeProfile::Counter* _expr_timer;
- RuntimeProfile::Counter* _exec_timer;
private:
friend class pipeline::AggSinkOperator;
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp
b/be/src/vec/exec/vanalytic_eval_node.cpp
index 49fbc1c7118..a73011e24e4 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -264,6 +264,7 @@ Status VAnalyticEvalNode::alloc_resource(RuntimeState*
state) {
Status VAnalyticEvalNode::pull(doris::RuntimeState* /*state*/,
vectorized::Block* output_block,
bool* eos) {
+ SCOPED_TIMER(_exec_timer);
if (_input_eos && (_output_block_index == _input_blocks.size() ||
_input_total_rows == 0)) {
*eos = true;
return Status::OK();
@@ -290,6 +291,7 @@ Status VAnalyticEvalNode::pull(doris::RuntimeState*
/*state*/, vectorized::Block
}
void VAnalyticEvalNode::release_resource(RuntimeState* state) {
+ SCOPED_TIMER(_exec_timer);
if (is_closed()) {
return;
}
@@ -335,6 +337,8 @@ Status VAnalyticEvalNode::get_next(RuntimeState* state,
vectorized::Block* block
break;
}
}
+
+ SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(_output_current_block(block));
RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, block,
block->columns()));
reached_limit(block, eos);
@@ -540,6 +544,7 @@ Status
VAnalyticEvalNode::_fetch_next_block_data(RuntimeState* state) {
Status VAnalyticEvalNode::sink(doris::RuntimeState* /*state*/,
vectorized::Block* input_block,
bool eos) {
+ SCOPED_TIMER(_exec_timer);
_input_eos = eos;
if (_input_eos && input_block->rows() == 0) {
_need_more_input = false;
diff --git a/be/src/vec/exec/vexchange_node.cpp
b/be/src/vec/exec/vexchange_node.cpp
index 3d9a50ded23..7eb98588892 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -63,6 +63,7 @@ Status VExchangeNode::init(const TPlanNode& tnode,
RuntimeState* state) {
Status VExchangeNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_TIMER(_exec_timer);
DCHECK_GT(_num_senders, 0);
_sub_plan_query_statistics_recvr.reset(new QueryStatisticsRecvr());
_stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
@@ -76,6 +77,7 @@ Status VExchangeNode::prepare(RuntimeState* state) {
}
Status VExchangeNode::alloc_resource(RuntimeState* state) {
+ SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(ExecNode::alloc_resource(state));
if (_is_merging) {
RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
@@ -96,6 +98,7 @@ Status VExchangeNode::open(RuntimeState* state) {
}
Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) {
+ SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(runtime_profile()->total_time_counter());
if (_is_merging && state->enable_pipeline_exec() && !_is_ready) {
RETURN_IF_ERROR(_stream_recvr->create_merger(_vsort_exec_exprs.lhs_ordering_expr_ctxs(),
diff --git a/be/src/vec/exec/vpartition_sort_node.cpp
b/be/src/vec/exec/vpartition_sort_node.cpp
index 464c742b24b..6db90763fa7 100644
--- a/be/src/vec/exec/vpartition_sort_node.cpp
+++ b/be/src/vec/exec/vpartition_sort_node.cpp
@@ -81,6 +81,7 @@ Status VPartitionSortNode::prepare(RuntimeState* state) {
_emplace_key_timer = ADD_TIMER(runtime_profile(), "EmplaceKeyTime");
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, child(0)->row_desc(),
_row_descriptor));
RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state,
child(0)->row_desc()));
_init_hash_method();
@@ -144,6 +145,7 @@ void VPartitionSortNode::_emplace_into_hash_table(const
ColumnRawPtrs& key_colum
}
Status VPartitionSortNode::sink(RuntimeState* state, vectorized::Block*
input_block, bool eos) {
+ SCOPED_TIMER(_exec_timer);
auto current_rows = input_block->rows();
if (current_rows > 0) {
child_input_rows = child_input_rows + current_rows;
@@ -229,6 +231,7 @@ Status VPartitionSortNode::open(RuntimeState* state) {
}
Status VPartitionSortNode::alloc_resource(RuntimeState* state) {
+ SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::alloc_resource(state));
RETURN_IF_ERROR(VExpr::open(_partition_expr_ctxs, state));
@@ -245,6 +248,7 @@ bool VPartitionSortNode::can_read() {
Status VPartitionSortNode::pull(doris::RuntimeState* state, vectorized::Block*
output_block,
bool* eos) {
+ SCOPED_TIMER(_exec_timer);
RETURN_IF_CANCELLED(state);
output_block->clear_column_data();
{
diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp
index b839a3285ab..1e522daa090 100644
--- a/be/src/vec/exec/vrepeat_node.cpp
+++ b/be/src/vec/exec/vrepeat_node.cpp
@@ -68,6 +68,7 @@ Status VRepeatNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_TIMER(_exec_timer);
_output_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
if (_output_tuple_desc == nullptr) {
return Status::InternalError("Failed to get tuple descriptor.");
@@ -91,6 +92,7 @@ Status VRepeatNode::open(RuntimeState* state) {
}
Status VRepeatNode::alloc_resource(RuntimeState* state) {
+ SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::alloc_resource(state));
RETURN_IF_ERROR(VExpr::open(_expr_ctxs, state));
@@ -172,6 +174,7 @@ Status VRepeatNode::get_repeated_block(Block* child_block,
int repeat_id_idx, Bl
}
Status VRepeatNode::pull(doris::RuntimeState* state, vectorized::Block*
output_block, bool* eos) {
+ SCOPED_TIMER(_exec_timer);
RETURN_IF_CANCELLED(state);
DCHECK(_repeat_id_idx >= 0);
for (const std::vector<int64_t>& v : _grouping_list) {
@@ -200,6 +203,7 @@ Status VRepeatNode::pull(doris::RuntimeState* state,
vectorized::Block* output_b
}
Status VRepeatNode::push(RuntimeState* state, vectorized::Block* input_block,
bool eos) {
+ SCOPED_TIMER(_exec_timer);
_child_eos = eos;
DCHECK(!_intermediate_block || _intermediate_block->rows() == 0);
DCHECK(!_expr_ctxs.empty());
diff --git a/be/src/vec/exec/vselect_node.cpp b/be/src/vec/exec/vselect_node.cpp
index ee1628cd19f..0288e0ce1be 100644
--- a/be/src/vec/exec/vselect_node.cpp
+++ b/be/src/vec/exec/vselect_node.cpp
@@ -74,6 +74,7 @@ Status VSelectNode::get_next(RuntimeState* state,
vectorized::Block* block, bool
}
Status VSelectNode::pull(RuntimeState* state, vectorized::Block* output_block,
bool* eos) {
+ SCOPED_TIMER(_exec_timer);
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, output_block,
output_block->columns()));
reached_limit(output_block, eos);
diff --git a/be/src/vec/exec/vset_operation_node.cpp
b/be/src/vec/exec/vset_operation_node.cpp
index d284385b8ed..e8e7500b948 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -105,6 +105,7 @@ Status VSetOperationNode<is_intersect>::init(const
TPlanNode& tnode, RuntimeStat
template <bool is_intersect>
Status VSetOperationNode<is_intersect>::alloc_resource(RuntimeState* state) {
+ SCOPED_TIMER(_exec_timer);
// open result expr lists.
for (const VExprContextSPtrs& exprs : _child_expr_lists) {
RETURN_IF_ERROR(VExpr::open(exprs, state));
@@ -155,6 +156,7 @@ template <bool is_intersect>
Status VSetOperationNode<is_intersect>::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_TIMER(_exec_timer);
_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
_probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
_pull_timer = ADD_TIMER(runtime_profile(), "PullTime");
@@ -223,6 +225,7 @@ void VSetOperationNode<is_intersect>::hash_table_init() {
template <bool is_intersect>
Status VSetOperationNode<is_intersect>::sink(RuntimeState* state, Block*
block, bool eos) {
+ SCOPED_TIMER(_exec_timer);
constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
if (block->rows() != 0) {
@@ -259,6 +262,7 @@ Status VSetOperationNode<is_intersect>::sink(RuntimeState*
state, Block* block,
template <bool is_intersect>
Status VSetOperationNode<is_intersect>::pull(RuntimeState* state, Block*
output_block, bool* eos) {
+ SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_pull_timer);
create_mutable_cols(output_block);
auto st = std::visit(
@@ -352,6 +356,7 @@ void
VSetOperationNode<is_intersect>::add_result_columns(RowRefListWithFlags& va
template <bool is_intersect>
Status VSetOperationNode<is_intersect>::sink_probe(RuntimeState* state, int
child_id, Block* block,
bool eos) {
+ SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_probe_timer);
CHECK(_build_finished) << "cannot sink probe data before build finished";
if (child_id > 1) {
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index d32110c7b6f..538bed4eb6c 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -113,6 +113,7 @@ Status VSortNode::init(const TPlanNode& tnode,
RuntimeState* state) {
Status VSortNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_TIMER(_exec_timer);
_runtime_profile->add_info_string("TOP-N", _limit == -1 ? "false" :
"true");
_memory_usage_counter = ADD_LABEL_COUNTER(runtime_profile(),
"MemoryUsage");
@@ -127,6 +128,7 @@ Status VSortNode::prepare(RuntimeState* state) {
}
Status VSortNode::alloc_resource(doris::RuntimeState* state) {
+ SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(ExecNode::alloc_resource(state));
RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
RETURN_IF_CANCELLED(state);
@@ -136,6 +138,7 @@ Status VSortNode::alloc_resource(doris::RuntimeState*
state) {
}
Status VSortNode::sink(RuntimeState* state, vectorized::Block* input_block,
bool eos) {
+ SCOPED_TIMER(_exec_timer);
if (input_block->rows() > 0) {
RETURN_IF_ERROR(_sorter->append_block(input_block));
RETURN_IF_CANCELLED(state);
@@ -201,6 +204,7 @@ Status VSortNode::open(RuntimeState* state) {
}
Status VSortNode::pull(doris::RuntimeState* state, vectorized::Block*
output_block, bool* eos) {
+ SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_get_next_timer);
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_sorter->get_next(state, output_block,
eos));
reached_limit(output_block, eos);
diff --git a/be/src/vec/exec/vtable_function_node.cpp
b/be/src/vec/exec/vtable_function_node.cpp
index f66b3e3e620..23da667b7d6 100644
--- a/be/src/vec/exec/vtable_function_node.cpp
+++ b/be/src/vec/exec/vtable_function_node.cpp
@@ -101,6 +101,7 @@ bool VTableFunctionNode::_is_inner_and_empty() {
Status VTableFunctionNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_TIMER(_exec_timer);
_num_rows_filtered_counter = ADD_COUNTER(_runtime_profile, "RowsFiltered",
TUnit::UNIT);
for (auto fn : _fns) {
diff --git a/be/src/vec/exec/vtable_function_node.h
b/be/src/vec/exec/vtable_function_node.h
index 94779880477..fd9bde2cc90 100644
--- a/be/src/vec/exec/vtable_function_node.h
+++ b/be/src/vec/exec/vtable_function_node.h
@@ -59,6 +59,7 @@ public:
return _children[0]->open(state);
}
Status alloc_resource(RuntimeState* state) override {
+ SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(ExecNode::alloc_resource(state));
return VExpr::open(_vfn_ctxs, state);
}
@@ -73,6 +74,7 @@ public:
}
Status push(RuntimeState* state, Block* input_block, bool eos) override {
+ SCOPED_TIMER(_exec_timer);
_child_eos = eos;
if (input_block->rows() == 0) {
return Status::OK();
@@ -86,6 +88,7 @@ public:
}
Status pull(RuntimeState* state, Block* output_block, bool* eos) override {
+ SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(_get_expanded_block(state, output_block, eos));
reached_limit(output_block, eos);
return Status::OK();
diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp
index a27a1b7a425..f3063addf5d 100644
--- a/be/src/vec/exec/vunion_node.cpp
+++ b/be/src/vec/exec/vunion_node.cpp
@@ -80,6 +80,7 @@ Status VUnionNode::init(const TPlanNode& tnode, RuntimeState*
state) {
Status VUnionNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_TIMER(_exec_timer);
_materialize_exprs_evaluate_timer =
ADD_TIMER(_runtime_profile, "MaterializeExprsEvaluateTimer");
// Prepare const expr lists.
@@ -105,6 +106,7 @@ Status VUnionNode::open(RuntimeState* state) {
}
Status VUnionNode::alloc_resource(RuntimeState* state) {
+ SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_runtime_profile->total_time_counter());
std::unique_lock<std::mutex> l(_resource_lock);
@@ -211,6 +213,7 @@ Status VUnionNode::get_next_materialized(RuntimeState*
state, Block* block) {
}
Status VUnionNode::get_next_const(RuntimeState* state, Block* block) {
+ SCOPED_TIMER(_exec_timer);
DCHECK_EQ(state->per_fragment_instance_idx(), 0);
DCHECK_LT(_const_expr_list_idx, _const_expr_lists.size());
@@ -282,6 +285,7 @@ Status VUnionNode::get_next(RuntimeState* state, Block*
block, bool* eos) {
} else if (has_more_const(state)) {
RETURN_IF_ERROR(get_next_const(state, block));
}
+ SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, block,
block->columns()));
*eos = (!has_more_passthrough() && !has_more_materialized() &&
!has_more_const(state));
@@ -318,6 +322,7 @@ void VUnionNode::debug_string(int indentation_level,
std::stringstream* out) con
}
Status VUnionNode::materialize_block(Block* src_block, int child_idx, Block*
res_block) {
+ SCOPED_TIMER(_exec_timer);
const auto& child_exprs = _child_expr_lists[child_idx];
ColumnsWithTypeAndName colunms;
for (size_t i = 0; i < child_exprs.size(); ++i) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]