This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 454cdffefce [profile](pipeline) Add key metrics for pipeline 
initialization (#35073)
454cdffefce is described below

commit 454cdffefce1d2854e1d970468c3b27b4878da8d
Author: Gabriel <[email protected]>
AuthorDate: Wed May 22 09:29:47 2024 +0800

    [profile](pipeline) Add key metrics for pipeline initialization (#35073)
---
 be/src/pipeline/pipeline_fragment_context.cpp      | 162 ++++++++++++---------
 be/src/pipeline/pipeline_fragment_context.h        |   5 +
 be/src/runtime/fragment_mgr.cpp                    |   8 +
 .../doris/common/profile/ExecutionProfile.java     |   5 +-
 gensrc/thrift/FrontendService.thrift               |   1 +
 5 files changed, 107 insertions(+), 74 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index c1c939ca88c..54ab15041b0 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -221,89 +221,103 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
     if (_prepared) {
         return Status::InternalError("Already prepared");
     }
-    _num_instances = request.local_params.size();
-    _total_instances = request.__isset.total_instances ? 
request.total_instances : _num_instances;
     _runtime_profile = std::make_unique<RuntimeProfile>("PipelineContext");
     _prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime");
     SCOPED_TIMER(_prepare_timer);
+    _build_pipelines_timer = ADD_TIMER(_runtime_profile, "BuildPipelinesTime");
+    _init_context_timer = ADD_TIMER(_runtime_profile, "InitContextTime");
+    _plan_local_shuffle_timer = ADD_TIMER(_runtime_profile, 
"PlanLocalShuffleTime");
+    _build_tasks_timer = ADD_TIMER(_runtime_profile, "BuildTasksTime");
+    _prepare_all_pipelines_timer = ADD_TIMER(_runtime_profile, 
"PrepareAllPipelinesTime");
+    {
+        SCOPED_TIMER(_init_context_timer);
+        _num_instances = request.local_params.size();
+        _total_instances =
+                request.__isset.total_instances ? request.total_instances : 
_num_instances;
 
-    auto* fragment_context = this;
-
-    LOG_INFO("PipelineFragmentContext::prepare")
-            .tag("query_id", print_id(_query_id))
-            .tag("fragment_id", _fragment_id)
-            .tag("pthread_id", (uintptr_t)pthread_self());
-
-    if (request.query_options.__isset.is_report_success) {
-        
fragment_context->set_is_report_success(request.query_options.is_report_success);
-    }
+        auto* fragment_context = this;
 
-    // 1. Set up the global runtime state.
-    _runtime_state = RuntimeState::create_unique(request.query_id, 
request.fragment_id,
-                                                 request.query_options, 
_query_ctx->query_globals,
-                                                 _exec_env, _query_ctx.get());
+        LOG_INFO("PipelineFragmentContext::prepare")
+                .tag("query_id", print_id(_query_id))
+                .tag("fragment_id", _fragment_id)
+                .tag("pthread_id", (uintptr_t)pthread_self());
 
-    
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
-    if (request.__isset.backend_id) {
-        _runtime_state->set_backend_id(request.backend_id);
-    }
-    if (request.__isset.import_label) {
-        _runtime_state->set_import_label(request.import_label);
-    }
-    if (request.__isset.db_name) {
-        _runtime_state->set_db_name(request.db_name);
-    }
-    if (request.__isset.load_job_id) {
-        _runtime_state->set_load_job_id(request.load_job_id);
-    }
+        if (request.query_options.__isset.is_report_success) {
+            
fragment_context->set_is_report_success(request.query_options.is_report_success);
+        }
 
-    if (request.is_simplified_param) {
-        _desc_tbl = _query_ctx->desc_tbl;
-    } else {
-        DCHECK(request.__isset.desc_tbl);
-        RETURN_IF_ERROR(
-                DescriptorTbl::create(_runtime_state->obj_pool(), 
request.desc_tbl, &_desc_tbl));
-    }
-    _runtime_state->set_desc_tbl(_desc_tbl);
-    _runtime_state->set_num_per_fragment_instances(request.num_senders);
-    _runtime_state->set_load_stream_per_node(request.load_stream_per_node);
-    _runtime_state->set_total_load_streams(request.total_load_streams);
-    _runtime_state->set_num_local_sink(request.num_local_sink);
-
-    const auto& local_params = request.local_params[0];
-    if (local_params.__isset.runtime_filter_params) {
-        _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
-                local_params.runtime_filter_params);
-    }
-    if (local_params.__isset.topn_filter_source_node_ids) {
-        
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
-    } else {
-        _query_ctx->init_runtime_predicates({0});
-    }
+        // 1. Set up the global runtime state.
+        _runtime_state = RuntimeState::create_unique(
+                request.query_id, request.fragment_id, request.query_options,
+                _query_ctx->query_globals, _exec_env, _query_ctx.get());
 
-    _need_local_merge = request.__isset.parallel_instances;
+        
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
+        if (request.__isset.backend_id) {
+            _runtime_state->set_backend_id(request.backend_id);
+        }
+        if (request.__isset.import_label) {
+            _runtime_state->set_import_label(request.import_label);
+        }
+        if (request.__isset.db_name) {
+            _runtime_state->set_db_name(request.db_name);
+        }
+        if (request.__isset.load_job_id) {
+            _runtime_state->set_load_job_id(request.load_job_id);
+        }
 
-    // 2. Build pipelines with operators in this fragment.
-    auto root_pipeline = add_pipeline();
-    RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines(
-            _runtime_state->obj_pool(), request, *_query_ctx->desc_tbl, 
&_root_op, root_pipeline));
+        if (request.is_simplified_param) {
+            _desc_tbl = _query_ctx->desc_tbl;
+        } else {
+            DCHECK(request.__isset.desc_tbl);
+            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), 
request.desc_tbl,
+                                                  &_desc_tbl));
+        }
+        _runtime_state->set_desc_tbl(_desc_tbl);
+        _runtime_state->set_num_per_fragment_instances(request.num_senders);
+        _runtime_state->set_load_stream_per_node(request.load_stream_per_node);
+        _runtime_state->set_total_load_streams(request.total_load_streams);
+        _runtime_state->set_num_local_sink(request.num_local_sink);
+
+        const auto& local_params = request.local_params[0];
+        if (local_params.__isset.runtime_filter_params) {
+            _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
+                    local_params.runtime_filter_params);
+        }
+        if (local_params.__isset.topn_filter_source_node_ids) {
+            
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
+        } else {
+            _query_ctx->init_runtime_predicates({0});
+        }
 
-    // 3. Create sink operator
-    if (!request.fragment.__isset.output_sink) {
-        return Status::InternalError("No output sink in this fragment!");
+        _need_local_merge = request.__isset.parallel_instances;
     }
-    RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink(
-            _runtime_state->obj_pool(), request.fragment.output_sink, 
request.fragment.output_exprs,
-            request, root_pipeline->output_row_desc(), _runtime_state.get(), 
*_desc_tbl,
-            root_pipeline->id()));
-    RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
-    RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
 
-    for (PipelinePtr& pipeline : _pipelines) {
-        DCHECK(pipeline->sink_x() != nullptr) << 
pipeline->operator_xs().size();
-        
RETURN_IF_ERROR(pipeline->sink_x()->set_child(pipeline->operator_xs().back()));
+    {
+        SCOPED_TIMER(_build_pipelines_timer);
+        // 2. Build pipelines with operators in this fragment.
+        auto root_pipeline = add_pipeline();
+        
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines(_runtime_state->obj_pool(), 
request,
+                                                            
*_query_ctx->desc_tbl, &_root_op,
+                                                            root_pipeline));
+
+        // 3. Create sink operator
+        if (!request.fragment.__isset.output_sink) {
+            return Status::InternalError("No output sink in this fragment!");
+        }
+        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink(
+                _runtime_state->obj_pool(), request.fragment.output_sink,
+                request.fragment.output_exprs, request, 
root_pipeline->output_row_desc(),
+                _runtime_state.get(), *_desc_tbl, root_pipeline->id()));
+        RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
+        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
+
+        for (PipelinePtr& pipeline : _pipelines) {
+            DCHECK(pipeline->sink_x() != nullptr) << 
pipeline->operator_xs().size();
+            
RETURN_IF_ERROR(pipeline->sink_x()->set_child(pipeline->operator_xs().back()));
+        }
     }
     if (_enable_local_shuffle()) {
+        SCOPED_TIMER(_plan_local_shuffle_timer);
         RETURN_IF_ERROR(_plan_local_exchange(request.num_buckets,
                                              
request.bucket_seq_to_instance_idx,
                                              
request.shuffle_idx_to_instance_idx));
@@ -311,12 +325,16 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
 
     // 4. Initialize global states in pipelines.
     for (PipelinePtr& pipeline : _pipelines) {
+        SCOPED_TIMER(_prepare_all_pipelines_timer);
         pipeline->children().clear();
         RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
     }
 
-    // 5. Build pipeline tasks and initialize local state.
-    RETURN_IF_ERROR(_build_pipeline_tasks(request));
+    {
+        SCOPED_TIMER(_build_tasks_timer);
+        // 5. Build pipeline tasks and initialize local state.
+        RETURN_IF_ERROR(_build_pipeline_tasks(request));
+    }
 
     _init_next_report_time();
 
@@ -1560,7 +1578,7 @@ Status PipelineFragmentContext::send_report(bool done) {
     ReportStatusRequest req {true,
                              exec_status,
                              runtime_states,
-                             nullptr,
+                             _runtime_profile.get(),
                              _runtime_state->load_channel_profile(),
                              done || !exec_status.ok(),
                              _query_ctx->coord_addr,
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index f4e324b6f53..1044282289b 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -218,6 +218,11 @@ private:
 
     MonotonicStopWatch _fragment_watcher;
     RuntimeProfile::Counter* _prepare_timer = nullptr;
+    RuntimeProfile::Counter* _init_context_timer = nullptr;
+    RuntimeProfile::Counter* _build_pipelines_timer = nullptr;
+    RuntimeProfile::Counter* _plan_local_shuffle_timer = nullptr;
+    RuntimeProfile::Counter* _prepare_all_pipelines_timer = nullptr;
+    RuntimeProfile::Counter* _build_tasks_timer = nullptr;
 
     std::function<void(RuntimeState*, Status*)> _call_back;
     bool _is_fragment_instance_closed = false;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 959b85bb537..40013fb33dd 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -267,6 +267,14 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
             }
 
             if (enable_profile) {
+                DCHECK(req.profile != nullptr);
+                TDetailedReportParams detailed_param;
+                detailed_param.__isset.fragment_instance_id = false;
+                detailed_param.__isset.profile = true;
+                detailed_param.__isset.loadChannelProfile = false;
+                detailed_param.__set_is_fragment_level(true);
+                req.profile->to_thrift(&detailed_param.profile);
+                params.detailed_report.push_back(detailed_param);
                 for (auto pipeline_profile : 
req.runtime_state->pipeline_id_to_profile()) {
                     TDetailedReportParams detailed_param;
                     detailed_param.__isset.fragment_instance_id = false;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index dc91210eeb7..76420045be6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -340,9 +340,10 @@ public class ExecutionProfile {
         if (isPipelineXProfile) {
             int pipelineIdx = 0;
             List<RuntimeProfile> taskProfile = Lists.newArrayList();
+            String suffix = " (host=" + backend.getHeartbeatAddress() + ")";
             for (TDetailedReportParams param : params.detailed_report) {
-                String name = "Pipeline :" + pipelineIdx + " "
-                        + " (host=" + backend.getHeartbeatAddress() + ")";
+                String name = param.isSetIsFragmentLevel() && 
param.is_fragment_level ? "Fragment Level Profile: "
+                        + suffix : "Pipeline :" + pipelineIdx + " " + suffix;
                 RuntimeProfile profile = new RuntimeProfile(name);
                 taskProfile.add(profile);
                 if (param.isSetProfile()) {
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index bd095db9dbc..aea1fe7604e 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -395,6 +395,7 @@ struct TDetailedReportParams {
   1: optional Types.TUniqueId fragment_instance_id
   2: optional RuntimeProfile.TRuntimeProfileTree profile
   3: optional RuntimeProfile.TRuntimeProfileTree loadChannelProfile
+  4: optional bool is_fragment_level
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to