This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4163a796693 [pipelineX](profile) make dep time merge (#28458)
4163a796693 is described below
commit 4163a796693ec43408fdf776624794cbec9c5212
Author: Mryange <[email protected]>
AuthorDate: Tue Dec 19 10:27:02 2023 +0800
[pipelineX](profile) make dep time merge (#28458)
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 5 +++--
be/src/pipeline/exec/exchange_source_operator.cpp | 5 +++--
be/src/pipeline/exec/result_sink_operator.cpp | 2 +-
be/src/pipeline/exec/scan_operator.cpp | 9 +++++----
be/src/pipeline/pipeline_x/dependency.h | 10 ----------
be/src/pipeline/pipeline_x/operator.cpp | 12 ++++++------
6 files changed, 18 insertions(+), 25 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index be3606433ec..165576d1ab2 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -127,8 +127,9 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
_local_bytes_send_counter = ADD_COUNTER(_profile, "LocalBytesSent",
TUnit::BYTES);
_memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage");
static const std::string timer_name = "WaitForDependencyTime";
- _wait_for_dependency_timer = ADD_TIMER(_profile, timer_name);
- _wait_queue_timer = ADD_CHILD_TIMER(_profile, "WaitForRpcBufferQueue",
timer_name);
+ _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_profile, timer_name, 1);
+ _wait_queue_timer =
+ ADD_CHILD_TIMER_WITH_LEVEL(_profile, "WaitForRpcBufferQueue",
timer_name, 1);
auto& p = _parent->cast<ExchangeSinkOperatorX>();
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index f64ebcddcac..41e57fbde79 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -88,9 +88,10 @@ Status ExchangeLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
}
static const std::string timer_name =
"WaitForDependency[" + source_dependency->name() + "]Time";
- _wait_for_dependency_timer = ADD_TIMER(_runtime_profile, timer_name);
+ _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile,
timer_name, 1);
for (size_t i = 0; i < queues.size(); i++) {
- metrics[i] = ADD_CHILD_TIMER(_runtime_profile,
fmt::format("WaitForData{}", i), timer_name);
+ metrics[i] = ADD_CHILD_TIMER_WITH_LEVEL(_runtime_profile,
fmt::format("WaitForData{}", i),
+ timer_name, 1);
}
RETURN_IF_ERROR(_parent->cast<ExchangeSourceOperatorX>()._vsort_exec_exprs.clone(
state, vsort_exec_exprs));
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp
b/be/src/pipeline/exec/result_sink_operator.cpp
index 8c314b995bb..33256dc7f00 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -55,7 +55,7 @@ Status ResultSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info)
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
static const std::string timer_name = "WaitForDependencyTime";
- _wait_for_dependency_timer = ADD_TIMER(_profile, timer_name);
+ _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_profile, timer_name, 1);
auto fragment_instance_id = state->fragment_instance_id();
// create sender
std::shared_ptr<BufferControlBlock> sender = nullptr;
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index a75366e9893..d8d3958a779 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -148,11 +148,12 @@ Status ScanLocalState<Derived>::init(RuntimeState* state,
LocalStateInfo& info)
_prepare_rf_timer(_runtime_profile.get());
static const std::string timer_name = "WaitForDependencyTime";
- _wait_for_dependency_timer = ADD_TIMER(_runtime_profile, timer_name);
- _wait_for_data_timer = ADD_CHILD_TIMER(_runtime_profile, "WaitForData",
timer_name);
+ _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile,
timer_name, 1);
+ _wait_for_data_timer =
+ ADD_CHILD_TIMER_WITH_LEVEL(_runtime_profile, "WaitForData",
timer_name, 1);
_wait_for_scanner_done_timer =
- ADD_CHILD_TIMER(_runtime_profile, "WaitForScannerDone",
timer_name);
- _wait_for_eos_timer = ADD_CHILD_TIMER(_runtime_profile, "WaitForEos",
timer_name);
+ ADD_CHILD_TIMER_WITH_LEVEL(_runtime_profile, "WaitForScannerDone",
timer_name, 1);
+ _wait_for_eos_timer = ADD_CHILD_TIMER_WITH_LEVEL(_runtime_profile,
"WaitForEos", timer_name, 1);
return Status::OK();
}
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 2e917cd4f5b..0aebc472637 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -276,16 +276,6 @@ public:
AndDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "AndDependency", query_ctx) {}
- [[nodiscard]] std::string name() const override {
- fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer, "{}[", Dependency::_name);
- for (auto& child : Dependency::_children) {
- fmt::format_to(debug_string_buffer, "{}, ", child->name());
- }
- fmt::format_to(debug_string_buffer, "]");
- return fmt::to_string(debug_string_buffer);
- }
-
std::string debug_string(int indentation_level = 0) override;
[[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override {
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index 0ea5ea60fbc..a466b8b3b67 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -346,8 +346,8 @@ Status
PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
}
_shared_state = (typename
DependencyType::SharedState*)_dependency->shared_state().get();
_shared_state->ref();
- _wait_for_dependency_timer =
- ADD_TIMER(_runtime_profile, "WaitForDependency[" +
_dependency->name() + "]Time");
+ _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
+ _runtime_profile, "WaitForDependency[" + _dependency->name() +
"]Time", 1);
_shared_state->source_dep = _dependency;
_shared_state->sink_dep = deps.front().get();
}
@@ -414,8 +414,8 @@ Status
PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
if (_dependency) {
_shared_state =
(typename
DependencyType::SharedState*)_dependency->shared_state().get();
- _wait_for_dependency_timer =
- ADD_TIMER(_profile, "WaitForDependency[" +
_dependency->name() + "]Time");
+ _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
+ _profile, "WaitForDependency[" + _dependency->name() +
"]Time", 1);
}
_shared_state->ref();
} else {
@@ -511,8 +511,8 @@ Status AsyncWriterSink<Writer, Parent>::init(RuntimeState*
state, LocalSinkState
_parent->operator_id(), _parent->node_id(),
state->get_query_ctx());
_writer->set_dependency(_async_writer_dependency.get(),
_finish_dependency.get());
- _wait_for_dependency_timer =
- ADD_TIMER(_profile, "WaitForDependency[" +
_async_writer_dependency->name() + "]Time");
+ _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
+ _profile, "WaitForDependency[" + _async_writer_dependency->name()
+ "]Time", 1);
_finish_dependency->block();
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]