This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4b5ca106efd [refine](profileV2) use task dependency in profile and
print pipelinetask index (#26059)
4b5ca106efd is described below
commit 4b5ca106efda8b5968f2a4e08d063be32f004686
Author: Mryange <[email protected]>
AuthorDate: Mon Oct 30 18:40:04 2023 +0800
[refine](profileV2) use task dependency in profile and print pipelinetask
index (#26059)
---
be/src/pipeline/pipeline_task.h | 2 ++
.../pipeline_x/pipeline_x_fragment_context.cpp | 26 ++++++++++++++++++++--
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 2 +-
be/src/pipeline/pipeline_x/pipeline_x_task.h | 6 +++++
4 files changed, 33 insertions(+), 3 deletions(-)
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index ef923868a51..690c4e3419d 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -249,6 +249,8 @@ public:
TUniqueId instance_id() const { return _state->fragment_instance_id(); }
+ void set_parent_profile(RuntimeProfile* profile) { _parent_profile =
profile; }
+
protected:
void _finish_p_dependency() {
for (const auto& p : _pipeline->_parents) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 5ca5829e1a8..06659eb23e0 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -450,6 +450,29 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
* and JoinProbeOperator2.
*/
+ // First, set up the parent profile,
+ // then prepare the task profile and add it to
operator_id_to_task_profile.
+ std::vector<RuntimeProfile*> operator_id_to_task_profile(
+ max_operator_id(), _runtime_states[i]->runtime_profile());
+ auto prepare_and_set_parent_profile = [&](PipelineXTask* task) {
+ auto sink = task->sink();
+ const auto& dests_id = sink->dests_id();
+ int dest_id = dests_id.front();
+ DCHECK(dest_id < operator_id_to_task_profile.size());
+ task->set_parent_profile(operator_id_to_task_profile[dest_id]);
+
+ RETURN_IF_ERROR(task->prepare(_runtime_states[i].get(),
local_params,
+ request.fragment.output_sink));
+
+ for (auto o : task->operatorXs()) {
+ int id = o->operator_id();
+ DCHECK(id < operator_id_to_task_profile.size());
+ auto* op_local_state =
_runtime_states[i].get()->get_local_state(o->operator_id());
+ operator_id_to_task_profile[id] = op_local_state->profile();
+ }
+ return Status::OK();
+ };
+
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
auto task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
DCHECK(task != nullptr);
@@ -462,8 +485,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
pipeline_id_to_task[dep]->get_downstream_dependency());
}
}
- RETURN_IF_ERROR(task->prepare(_runtime_states[i].get(),
local_params,
- request.fragment.output_sink));
+ RETURN_IF_ERROR(prepare_and_set_parent_profile(task));
}
{
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 9ad91341ce5..140fecbb1d5 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -135,7 +135,7 @@ Status PipelineXTask::extract_dependencies() {
void PipelineXTask::_init_profile() {
std::stringstream ss;
- ss << "PipelineTask"
+ ss << "PipelineXTask"
<< " (index=" << _index << ")";
auto* task_profile = new RuntimeProfile(ss.str());
_parent_profile->add_child(task_profile, true, nullptr);
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 5155c2fe766..3a33431192e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -151,6 +151,12 @@ public:
void push_blocked_task_to_dependency(Dependency* dep) {}
+ DataSinkOperatorXPtr sink() const { return _sink; }
+
+ OperatorXPtr source() const { return _source; }
+
+ OperatorXs operatorXs() { return _operators; }
+
private:
void set_close_pipeline_time() override {}
void _init_profile() override;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]