This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b5ca106efd [refine](profileV2) use task dependency in profile and 
print pipelinetask index (#26059)
4b5ca106efd is described below

commit 4b5ca106efda8b5968f2a4e08d063be32f004686
Author: Mryange <[email protected]>
AuthorDate: Mon Oct 30 18:40:04 2023 +0800

    [refine](profileV2) use task dependency in profile and print pipelinetask 
index (#26059)
---
 be/src/pipeline/pipeline_task.h                    |  2 ++
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 26 ++++++++++++++++++++--
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     |  2 +-
 be/src/pipeline/pipeline_x/pipeline_x_task.h       |  6 +++++
 4 files changed, 33 insertions(+), 3 deletions(-)

diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index ef923868a51..690c4e3419d 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -249,6 +249,8 @@ public:
 
     TUniqueId instance_id() const { return _state->fragment_instance_id(); }
 
+    void set_parent_profile(RuntimeProfile* profile) { _parent_profile = 
profile; }
+
 protected:
     void _finish_p_dependency() {
         for (const auto& p : _pipeline->_parents) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 5ca5829e1a8..06659eb23e0 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -450,6 +450,29 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
          * and JoinProbeOperator2.
          */
 
+        // First, set up the parent profile,
+        // then prepare the task profile and add it to 
operator_id_to_task_profile.
+        std::vector<RuntimeProfile*> operator_id_to_task_profile(
+                max_operator_id(), _runtime_states[i]->runtime_profile());
+        auto prepare_and_set_parent_profile = [&](PipelineXTask* task) {
+            auto sink = task->sink();
+            const auto& dests_id = sink->dests_id();
+            int dest_id = dests_id.front();
+            DCHECK(dest_id < operator_id_to_task_profile.size());
+            task->set_parent_profile(operator_id_to_task_profile[dest_id]);
+
+            RETURN_IF_ERROR(task->prepare(_runtime_states[i].get(), 
local_params,
+                                          request.fragment.output_sink));
+
+            for (auto o : task->operatorXs()) {
+                int id = o->operator_id();
+                DCHECK(id < operator_id_to_task_profile.size());
+                auto* op_local_state = 
_runtime_states[i].get()->get_local_state(o->operator_id());
+                operator_id_to_task_profile[id] = op_local_state->profile();
+            }
+            return Status::OK();
+        };
+
         for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
             auto task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
             DCHECK(task != nullptr);
@@ -462,8 +485,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
                             
pipeline_id_to_task[dep]->get_downstream_dependency());
                 }
             }
-            RETURN_IF_ERROR(task->prepare(_runtime_states[i].get(), 
local_params,
-                                          request.fragment.output_sink));
+            RETURN_IF_ERROR(prepare_and_set_parent_profile(task));
         }
 
         {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 9ad91341ce5..140fecbb1d5 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -135,7 +135,7 @@ Status PipelineXTask::extract_dependencies() {
 
 void PipelineXTask::_init_profile() {
     std::stringstream ss;
-    ss << "PipelineTask"
+    ss << "PipelineXTask"
        << " (index=" << _index << ")";
     auto* task_profile = new RuntimeProfile(ss.str());
     _parent_profile->add_child(task_profile, true, nullptr);
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 5155c2fe766..3a33431192e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -151,6 +151,12 @@ public:
 
     void push_blocked_task_to_dependency(Dependency* dep) {}
 
+    DataSinkOperatorXPtr sink() const { return _sink; }
+
+    OperatorXPtr source() const { return _source; }
+
+    OperatorXs operatorXs() { return _operators; }
+
 private:
     void set_close_pipeline_time() override {}
     void _init_profile() override;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to