This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a0de08255d [pipelineX](profile) Add necessary metrics (#24820)
a0de08255d is described below
commit a0de08255d78feac8d6067c7d7c8a1ba1fb7c655
Author: Gabriel <[email protected]>
AuthorDate: Sat Sep 23 19:27:45 2023 +0800
[pipelineX](profile) Add necessary metrics (#24820)
---
be/src/pipeline/exec/multi_cast_data_stream_source.h | 2 +-
be/src/pipeline/exec/partition_sort_source_operator.cpp | 3 +++
be/src/pipeline/exec/partition_sort_source_operator.h | 2 ++
be/src/pipeline/pipeline_x/operator.h | 5 +++++
4 files changed, 11 insertions(+), 1 deletion(-)
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index e9093cff58..aa20272d07 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -177,7 +177,6 @@ class MultiCastDataStreamSinkLocalState final
ENABLE_FACTORY_CREATOR(MultiCastDataStreamSinkLocalState);
MultiCastDataStreamSinkLocalState(DataSinkOperatorXBase* parent,
RuntimeState* state)
: Base(parent, state) {}
- std::shared_ptr<pipeline::MultiCastDataStreamer>
multi_cast_data_streamer();
friend class MultiCastDataStreamSinkOperatorX;
friend class DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;
using Base = PipelineXSinkLocalState<MultiCastDependency>;
@@ -209,6 +208,7 @@ public:
SourceState source_state) override {
CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
+ COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
if (in_block->rows() > 0 || source_state == SourceState::FINISHED) {
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
auto st =
local_state._shared_state->_multi_cast_data_streamer->push(
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp
b/be/src/pipeline/exec/partition_sort_source_operator.cpp
index 52d3470413..25ea12e203 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp
@@ -33,6 +33,8 @@ Status PartitionSortSourceLocalState::close(RuntimeState*
state) {
if (_closed) {
return Status::OK();
}
+ SCOPED_TIMER(profile()->total_time_counter());
+ SCOPED_TIMER(_close_timer);
_shared_state->previous_row = nullptr;
_shared_state->partition_sorts.clear();
return PipelineXLocalState<PartitionSortDependency>::close(state);
@@ -42,6 +44,7 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState*
state, vectorized::
SourceState& source_state) {
RETURN_IF_CANCELLED(state);
CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ SCOPED_TIMER(local_state.profile()->total_time_counter());
output_block->clear_column_data();
{
std::lock_guard<std::mutex>
lock(local_state._shared_state->buffer_mutex);
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h
b/be/src/pipeline/exec/partition_sort_source_operator.h
index 859e2d8b58..01abff61f4 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.h
+++ b/be/src/pipeline/exec/partition_sort_source_operator.h
@@ -61,6 +61,8 @@ public:
Status init(RuntimeState* state, LocalStateInfo& info) override {
RETURN_IF_ERROR(PipelineXLocalState<PartitionSortDependency>::init(state,
info));
+ SCOPED_TIMER(profile()->total_time_counter());
+ SCOPED_TIMER(_open_timer);
_get_next_timer = ADD_TIMER(profile(), "GetResultTime");
_get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime");
_shared_state->previous_row =
std::make_unique<vectorized::SortCursorCmp>();
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index 92f2b0b9da..cb6c8f1e3a 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -698,6 +698,9 @@ public:
_writer.reset(new Writer(info.tsink, _output_vexpr_ctxs));
_async_writer_dependency =
AsyncWriterDependency::create_shared(_parent->id());
_writer->set_dependency(_async_writer_dependency.get());
+
+ _wait_for_dependency_timer = ADD_TIMER(
+ _profile, "WaitForDependency[" +
_async_writer_dependency->name() + "]Time");
return Status::OK();
}
@@ -717,6 +720,8 @@ public:
if (_closed) {
return Status::OK();
}
+ COUNTER_SET(_wait_for_dependency_timer,
+ _async_writer_dependency->write_watcher_elapse_time());
if (_writer->need_normal_close()) {
if (exec_status.ok() && !state->is_cancelled()) {
RETURN_IF_ERROR(_writer->commit_trans());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]