This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a0de08255d [pipelineX](profile) Add necessary metrics (#24820)
a0de08255d is described below

commit a0de08255d78feac8d6067c7d7c8a1ba1fb7c655
Author: Gabriel <[email protected]>
AuthorDate: Sat Sep 23 19:27:45 2023 +0800

    [pipelineX](profile) Add necessary metrics (#24820)
---
 be/src/pipeline/exec/multi_cast_data_stream_source.h    | 2 +-
 be/src/pipeline/exec/partition_sort_source_operator.cpp | 3 +++
 be/src/pipeline/exec/partition_sort_source_operator.h   | 2 ++
 be/src/pipeline/pipeline_x/operator.h                   | 5 +++++
 4 files changed, 11 insertions(+), 1 deletion(-)

diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h 
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index e9093cff58..aa20272d07 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -177,7 +177,6 @@ class MultiCastDataStreamSinkLocalState final
     ENABLE_FACTORY_CREATOR(MultiCastDataStreamSinkLocalState);
     MultiCastDataStreamSinkLocalState(DataSinkOperatorXBase* parent, 
RuntimeState* state)
             : Base(parent, state) {}
-    std::shared_ptr<pipeline::MultiCastDataStreamer> 
multi_cast_data_streamer();
     friend class MultiCastDataStreamSinkOperatorX;
     friend class DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;
     using Base = PipelineXSinkLocalState<MultiCastDependency>;
@@ -209,6 +208,7 @@ public:
                 SourceState source_state) override {
         CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
         SCOPED_TIMER(local_state.profile()->total_time_counter());
+        COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
         if (in_block->rows() > 0 || source_state == SourceState::FINISHED) {
             COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
             auto st = 
local_state._shared_state->_multi_cast_data_streamer->push(
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp 
b/be/src/pipeline/exec/partition_sort_source_operator.cpp
index 52d3470413..25ea12e203 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp
@@ -33,6 +33,8 @@ Status PartitionSortSourceLocalState::close(RuntimeState* 
state) {
     if (_closed) {
         return Status::OK();
     }
+    SCOPED_TIMER(profile()->total_time_counter());
+    SCOPED_TIMER(_close_timer);
     _shared_state->previous_row = nullptr;
     _shared_state->partition_sorts.clear();
     return PipelineXLocalState<PartitionSortDependency>::close(state);
@@ -42,6 +44,7 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* 
state, vectorized::
                                                SourceState& source_state) {
     RETURN_IF_CANCELLED(state);
     CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    SCOPED_TIMER(local_state.profile()->total_time_counter());
     output_block->clear_column_data();
     {
         std::lock_guard<std::mutex> 
lock(local_state._shared_state->buffer_mutex);
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h 
b/be/src/pipeline/exec/partition_sort_source_operator.h
index 859e2d8b58..01abff61f4 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.h
+++ b/be/src/pipeline/exec/partition_sort_source_operator.h
@@ -61,6 +61,8 @@ public:
 
     Status init(RuntimeState* state, LocalStateInfo& info) override {
         
RETURN_IF_ERROR(PipelineXLocalState<PartitionSortDependency>::init(state, 
info));
+        SCOPED_TIMER(profile()->total_time_counter());
+        SCOPED_TIMER(_open_timer);
         _get_next_timer = ADD_TIMER(profile(), "GetResultTime");
         _get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime");
         _shared_state->previous_row = 
std::make_unique<vectorized::SortCursorCmp>();
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 92f2b0b9da..cb6c8f1e3a 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -698,6 +698,9 @@ public:
         _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs));
         _async_writer_dependency = 
AsyncWriterDependency::create_shared(_parent->id());
         _writer->set_dependency(_async_writer_dependency.get());
+
+        _wait_for_dependency_timer = ADD_TIMER(
+                _profile, "WaitForDependency[" + 
_async_writer_dependency->name() + "]Time");
         return Status::OK();
     }
 
@@ -717,6 +720,8 @@ public:
         if (_closed) {
             return Status::OK();
         }
+        COUNTER_SET(_wait_for_dependency_timer,
+                    _async_writer_dependency->write_watcher_elapse_time());
         if (_writer->need_normal_close()) {
             if (exec_status.ok() && !state->is_cancelled()) {
                 RETURN_IF_ERROR(_writer->commit_trans());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to