This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 7278a86c209 [profile](pipeline) Add key metrics for pipeline 
initialization (#35073) (#44738)
7278a86c209 is described below

commit 7278a86c209c2aadab77612bc679bd8f1b3a869e
Author: Gabriel <[email protected]>
AuthorDate: Mon Dec 2 16:02:19 2024 +0800

    [profile](pipeline) Add key metrics for pipeline initialization (#35073) 
(#44738)
    
    pick #35073
---
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 155 +++++++++++----------
 .../pipeline_x/pipeline_x_fragment_context.h       |   6 +
 be/src/runtime/fragment_mgr.cpp                    |   8 ++
 .../doris/common/profile/ExecutionProfile.java     |   5 +-
 gensrc/thrift/FrontendService.thrift               |   1 +
 5 files changed, 103 insertions(+), 72 deletions(-)

diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 91812bac925..860ba31097a 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -201,6 +201,11 @@ Status PipelineXFragmentContext::prepare(const 
doris::TPipelineFragmentParams& r
     _runtime_profile = std::make_unique<RuntimeProfile>("PipelineContext");
     _prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime");
     SCOPED_TIMER(_prepare_timer);
+    _build_pipelines_timer = ADD_TIMER(_runtime_profile, "BuildPipelinesTime");
+    _init_context_timer = ADD_TIMER(_runtime_profile, "InitContextTime");
+    _plan_local_shuffle_timer = ADD_TIMER(_runtime_profile, 
"PlanLocalShuffleTime");
+    _build_tasks_timer = ADD_TIMER(_runtime_profile, "BuildTasksTime");
+    _prepare_all_pipelines_timer = ADD_TIMER(_runtime_profile, 
"PrepareAllPipelinesTime");
 
     auto* fragment_context = this;
 
@@ -209,75 +214,83 @@ Status PipelineXFragmentContext::prepare(const 
doris::TPipelineFragmentParams& r
             .tag("fragment_id", _fragment_id)
             .tag("pthread_id", (uintptr_t)pthread_self());
 
-    if (request.query_options.__isset.is_report_success) {
-        
fragment_context->set_is_report_success(request.query_options.is_report_success);
-    }
-
-    // 1. Set up the global runtime state.
-    _runtime_state = RuntimeState::create_unique(request.query_id, 
request.fragment_id,
-                                                 request.query_options, 
_query_ctx->query_globals,
-                                                 _exec_env, _query_ctx.get());
-    
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
-    if (request.__isset.backend_id) {
-        _runtime_state->set_backend_id(request.backend_id);
-    }
-    if (request.__isset.import_label) {
-        _runtime_state->set_import_label(request.import_label);
-    }
-    if (request.__isset.db_name) {
-        _runtime_state->set_db_name(request.db_name);
-    }
-    if (request.__isset.load_job_id) {
-        _runtime_state->set_load_job_id(request.load_job_id);
-    }
-
-    if (request.is_simplified_param) {
-        _desc_tbl = _query_ctx->desc_tbl;
-    } else {
-        DCHECK(request.__isset.desc_tbl);
-        RETURN_IF_ERROR(
-                DescriptorTbl::create(_runtime_state->obj_pool(), 
request.desc_tbl, &_desc_tbl));
-    }
-    _runtime_state->set_desc_tbl(_desc_tbl);
-    _runtime_state->set_num_per_fragment_instances(request.num_senders);
-    _runtime_state->set_load_stream_per_node(request.load_stream_per_node);
-    _runtime_state->set_total_load_streams(request.total_load_streams);
-    _runtime_state->set_num_local_sink(request.num_local_sink);
-
-    const auto& local_params = request.local_params[0];
-    if (local_params.__isset.runtime_filter_params) {
-        _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
-                local_params.runtime_filter_params);
-    }
-    if (local_params.__isset.topn_filter_source_node_ids) {
-        
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
-    } else {
-        _query_ctx->init_runtime_predicates({0});
-    }
+    {
+        SCOPED_TIMER(_init_context_timer);
+        if (request.query_options.__isset.is_report_success) {
+            
fragment_context->set_is_report_success(request.query_options.is_report_success);
+        }
 
-    _need_local_merge = request.__isset.parallel_instances;
+        // 1. Set up the global runtime state.
+        _runtime_state = RuntimeState::create_unique(
+                request.query_id, request.fragment_id, request.query_options,
+                _query_ctx->query_globals, _exec_env, _query_ctx.get());
+        
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
+        if (request.__isset.backend_id) {
+            _runtime_state->set_backend_id(request.backend_id);
+        }
+        if (request.__isset.import_label) {
+            _runtime_state->set_import_label(request.import_label);
+        }
+        if (request.__isset.db_name) {
+            _runtime_state->set_db_name(request.db_name);
+        }
+        if (request.__isset.load_job_id) {
+            _runtime_state->set_load_job_id(request.load_job_id);
+        }
 
-    // 2. Build pipelines with operators in this fragment.
-    auto root_pipeline = add_pipeline();
-    RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines(
-            _runtime_state->obj_pool(), request, *_query_ctx->desc_tbl, 
&_root_op, root_pipeline));
+        if (request.is_simplified_param) {
+            _desc_tbl = _query_ctx->desc_tbl;
+        } else {
+            DCHECK(request.__isset.desc_tbl);
+            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), 
request.desc_tbl,
+                                                  &_desc_tbl));
+        }
+        _runtime_state->set_desc_tbl(_desc_tbl);
+        _runtime_state->set_num_per_fragment_instances(request.num_senders);
+        _runtime_state->set_load_stream_per_node(request.load_stream_per_node);
+        _runtime_state->set_total_load_streams(request.total_load_streams);
+        _runtime_state->set_num_local_sink(request.num_local_sink);
+
+        const auto& local_params = request.local_params[0];
+        if (local_params.__isset.runtime_filter_params) {
+            _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
+                    local_params.runtime_filter_params);
+        }
+        if (local_params.__isset.topn_filter_source_node_ids) {
+            
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
+        } else {
+            _query_ctx->init_runtime_predicates({0});
+        }
 
-    // 3. Create sink operator
-    if (!request.fragment.__isset.output_sink) {
-        return Status::InternalError("No output sink in this fragment!");
+        _need_local_merge = request.__isset.parallel_instances;
     }
-    RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink(
-            _runtime_state->obj_pool(), request.fragment.output_sink, 
request.fragment.output_exprs,
-            request, root_pipeline->output_row_desc(), _runtime_state.get(), 
*_desc_tbl,
-            root_pipeline->id()));
-    RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
-    RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
 
-    for (PipelinePtr& pipeline : _pipelines) {
-        DCHECK(pipeline->sink_x() != nullptr) << 
pipeline->operator_xs().size();
-        
RETURN_IF_ERROR(pipeline->sink_x()->set_child(pipeline->operator_xs().back()));
+    {
+        SCOPED_TIMER(_build_pipelines_timer);
+        // 2. Build pipelines with operators in this fragment.
+        auto root_pipeline = add_pipeline();
+        
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines(_runtime_state->obj_pool(), 
request,
+                                                            
*_query_ctx->desc_tbl, &_root_op,
+                                                            root_pipeline));
+
+        // 3. Create sink operator
+        if (!request.fragment.__isset.output_sink) {
+            return Status::InternalError("No output sink in this fragment!");
+        }
+        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink(
+                _runtime_state->obj_pool(), request.fragment.output_sink,
+                request.fragment.output_exprs, request, 
root_pipeline->output_row_desc(),
+                _runtime_state.get(), *_desc_tbl, root_pipeline->id()));
+        RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
+        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
+
+        for (PipelinePtr& pipeline : _pipelines) {
+            DCHECK(pipeline->sink_x() != nullptr) << 
pipeline->operator_xs().size();
+            
RETURN_IF_ERROR(pipeline->sink_x()->set_child(pipeline->operator_xs().back()));
+        }
     }
     if (_enable_local_shuffle()) {
+        SCOPED_TIMER(_plan_local_shuffle_timer);
         RETURN_IF_ERROR(_plan_local_exchange(request.num_buckets,
                                              
request.bucket_seq_to_instance_idx,
                                              
request.shuffle_idx_to_instance_idx));
@@ -285,13 +298,15 @@ Status PipelineXFragmentContext::prepare(const 
doris::TPipelineFragmentParams& r
 
     // 4. Initialize global states in pipelines.
     for (PipelinePtr& pipeline : _pipelines) {
+        SCOPED_TIMER(_prepare_all_pipelines_timer);
         pipeline->children().clear();
         RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
     }
-
-    // 5. Build pipeline tasks and initialize local state.
-    RETURN_IF_ERROR(_build_pipeline_x_tasks(request, thread_pool));
-
+    {
+        // 5. Build pipeline tasks and initialize local state.
+        SCOPED_TIMER(_build_tasks_timer);
+        RETURN_IF_ERROR(_build_pipeline_x_tasks(request, thread_pool));
+    }
     _init_next_report_time();
 
     _prepared = true;
@@ -1568,10 +1583,10 @@ Status PipelineXFragmentContext::send_report(bool done) 
{
         }
     }
     return _report_status_cb(
-            {true, exec_status, runtime_states, nullptr, 
_runtime_state->load_channel_profile(),
-             done || !exec_status.ok(), _query_ctx->coord_addr, _query_id, 
_fragment_id,
-             TUniqueId(), _backend_num, _runtime_state.get(),
-             [this](Status st) { return update_status(st); },
+            {true, exec_status, runtime_states, _runtime_profile.get(),
+             _runtime_state->load_channel_profile(), done || !exec_status.ok(),
+             _query_ctx->coord_addr, _query_id, _fragment_id, TUniqueId(), 
_backend_num,
+             _runtime_state.get(), [this](Status st) { return 
update_status(st); },
              [this](const PPlanFragmentCancelReason& reason, const 
std::string& msg) {
                  cancel(reason, msg);
              }},
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 60ac22a429e..c3e3f5596ad 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -242,6 +242,12 @@ private:
     int _total_instances = -1;
 
     bool _require_bucket_distribution = false;
+
+    RuntimeProfile::Counter* _init_context_timer = nullptr;
+    RuntimeProfile::Counter* _build_pipelines_timer = nullptr;
+    RuntimeProfile::Counter* _plan_local_shuffle_timer = nullptr;
+    RuntimeProfile::Counter* _prepare_all_pipelines_timer = nullptr;
+    RuntimeProfile::Counter* _build_tasks_timer = nullptr;
 };
 
 } // namespace pipeline
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 334b413b44b..f43190ebb36 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -359,6 +359,14 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
             }
 
             if (enable_profile) {
+                DCHECK(req.profile != nullptr);
+                TDetailedReportParams detailed_param;
+                detailed_param.__isset.fragment_instance_id = false;
+                detailed_param.__isset.profile = true;
+                detailed_param.__isset.loadChannelProfile = false;
+                detailed_param.__set_is_fragment_level(true);
+                req.profile->to_thrift(&detailed_param.profile);
+                params.detailed_report.push_back(detailed_param);
                 for (auto& pipeline_profile : 
req.runtime_state->pipeline_id_to_profile()) {
                     TDetailedReportParams detailed_param;
                     detailed_param.__isset.fragment_instance_id = false;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index d1db6ff43e2..054996d6fb1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -260,9 +260,10 @@ public class ExecutionProfile {
         if (isPipelineXProfile) {
             int pipelineIdx = 0;
             List<RuntimeProfile> taskProfile = Lists.newArrayList();
+            String suffix = " (host=" + backend.getHeartbeatAddress() + ")";
             for (TDetailedReportParams param : params.detailed_report) {
-                String name = "Pipeline :" + pipelineIdx + " "
-                        + " (host=" + backend.getHeartbeatAddress() + ")";
+                String name = param.isSetIsFragmentLevel() && 
param.is_fragment_level ? "Fragment Level Profile: "
+                        + suffix : "Pipeline :" + pipelineIdx + " " + suffix;
                 RuntimeProfile profile = new RuntimeProfile(name);
                 taskProfile.add(profile);
                 if (param.isSetProfile()) {
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index f8edb5d544b..90a75d5ba94 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -396,6 +396,7 @@ struct TDetailedReportParams {
   1: optional Types.TUniqueId fragment_instance_id
   2: optional RuntimeProfile.TRuntimeProfileTree profile
   3: optional RuntimeProfile.TRuntimeProfileTree loadChannelProfile
+  4: optional bool is_fragment_level
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to