This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 7278a86c209 [profile](pipeline) Add key metrics for pipeline
initialization (#35073) (#44738)
7278a86c209 is described below
commit 7278a86c209c2aadab77612bc679bd8f1b3a869e
Author: Gabriel <[email protected]>
AuthorDate: Mon Dec 2 16:02:19 2024 +0800
[profile](pipeline) Add key metrics for pipeline initialization (#35073)
(#44738)
pick #35073
---
.../pipeline_x/pipeline_x_fragment_context.cpp | 155 +++++++++++----------
.../pipeline_x/pipeline_x_fragment_context.h | 6 +
be/src/runtime/fragment_mgr.cpp | 8 ++
.../doris/common/profile/ExecutionProfile.java | 5 +-
gensrc/thrift/FrontendService.thrift | 1 +
5 files changed, 103 insertions(+), 72 deletions(-)
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 91812bac925..860ba31097a 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -201,6 +201,11 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
_runtime_profile = std::make_unique<RuntimeProfile>("PipelineContext");
_prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime");
SCOPED_TIMER(_prepare_timer);
+ _build_pipelines_timer = ADD_TIMER(_runtime_profile, "BuildPipelinesTime");
+ _init_context_timer = ADD_TIMER(_runtime_profile, "InitContextTime");
+ _plan_local_shuffle_timer = ADD_TIMER(_runtime_profile,
"PlanLocalShuffleTime");
+ _build_tasks_timer = ADD_TIMER(_runtime_profile, "BuildTasksTime");
+ _prepare_all_pipelines_timer = ADD_TIMER(_runtime_profile,
"PrepareAllPipelinesTime");
auto* fragment_context = this;
@@ -209,75 +214,83 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
.tag("fragment_id", _fragment_id)
.tag("pthread_id", (uintptr_t)pthread_self());
- if (request.query_options.__isset.is_report_success) {
-
fragment_context->set_is_report_success(request.query_options.is_report_success);
- }
-
- // 1. Set up the global runtime state.
- _runtime_state = RuntimeState::create_unique(request.query_id,
request.fragment_id,
- request.query_options,
_query_ctx->query_globals,
- _exec_env, _query_ctx.get());
-
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
- if (request.__isset.backend_id) {
- _runtime_state->set_backend_id(request.backend_id);
- }
- if (request.__isset.import_label) {
- _runtime_state->set_import_label(request.import_label);
- }
- if (request.__isset.db_name) {
- _runtime_state->set_db_name(request.db_name);
- }
- if (request.__isset.load_job_id) {
- _runtime_state->set_load_job_id(request.load_job_id);
- }
-
- if (request.is_simplified_param) {
- _desc_tbl = _query_ctx->desc_tbl;
- } else {
- DCHECK(request.__isset.desc_tbl);
- RETURN_IF_ERROR(
- DescriptorTbl::create(_runtime_state->obj_pool(),
request.desc_tbl, &_desc_tbl));
- }
- _runtime_state->set_desc_tbl(_desc_tbl);
- _runtime_state->set_num_per_fragment_instances(request.num_senders);
- _runtime_state->set_load_stream_per_node(request.load_stream_per_node);
- _runtime_state->set_total_load_streams(request.total_load_streams);
- _runtime_state->set_num_local_sink(request.num_local_sink);
-
- const auto& local_params = request.local_params[0];
- if (local_params.__isset.runtime_filter_params) {
- _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
- local_params.runtime_filter_params);
- }
- if (local_params.__isset.topn_filter_source_node_ids) {
-
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
- } else {
- _query_ctx->init_runtime_predicates({0});
- }
+ {
+ SCOPED_TIMER(_init_context_timer);
+ if (request.query_options.__isset.is_report_success) {
+
fragment_context->set_is_report_success(request.query_options.is_report_success);
+ }
- _need_local_merge = request.__isset.parallel_instances;
+ // 1. Set up the global runtime state.
+ _runtime_state = RuntimeState::create_unique(
+ request.query_id, request.fragment_id, request.query_options,
+ _query_ctx->query_globals, _exec_env, _query_ctx.get());
+
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
+ if (request.__isset.backend_id) {
+ _runtime_state->set_backend_id(request.backend_id);
+ }
+ if (request.__isset.import_label) {
+ _runtime_state->set_import_label(request.import_label);
+ }
+ if (request.__isset.db_name) {
+ _runtime_state->set_db_name(request.db_name);
+ }
+ if (request.__isset.load_job_id) {
+ _runtime_state->set_load_job_id(request.load_job_id);
+ }
- // 2. Build pipelines with operators in this fragment.
- auto root_pipeline = add_pipeline();
- RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines(
- _runtime_state->obj_pool(), request, *_query_ctx->desc_tbl,
&_root_op, root_pipeline));
+ if (request.is_simplified_param) {
+ _desc_tbl = _query_ctx->desc_tbl;
+ } else {
+ DCHECK(request.__isset.desc_tbl);
+ RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(),
request.desc_tbl,
+ &_desc_tbl));
+ }
+ _runtime_state->set_desc_tbl(_desc_tbl);
+ _runtime_state->set_num_per_fragment_instances(request.num_senders);
+ _runtime_state->set_load_stream_per_node(request.load_stream_per_node);
+ _runtime_state->set_total_load_streams(request.total_load_streams);
+ _runtime_state->set_num_local_sink(request.num_local_sink);
+
+ const auto& local_params = request.local_params[0];
+ if (local_params.__isset.runtime_filter_params) {
+ _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
+ local_params.runtime_filter_params);
+ }
+ if (local_params.__isset.topn_filter_source_node_ids) {
+
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
+ } else {
+ _query_ctx->init_runtime_predicates({0});
+ }
- // 3. Create sink operator
- if (!request.fragment.__isset.output_sink) {
- return Status::InternalError("No output sink in this fragment!");
+ _need_local_merge = request.__isset.parallel_instances;
}
- RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink(
- _runtime_state->obj_pool(), request.fragment.output_sink,
request.fragment.output_exprs,
- request, root_pipeline->output_row_desc(), _runtime_state.get(),
*_desc_tbl,
- root_pipeline->id()));
- RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
- RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
- for (PipelinePtr& pipeline : _pipelines) {
- DCHECK(pipeline->sink_x() != nullptr) <<
pipeline->operator_xs().size();
-
RETURN_IF_ERROR(pipeline->sink_x()->set_child(pipeline->operator_xs().back()));
+ {
+ SCOPED_TIMER(_build_pipelines_timer);
+ // 2. Build pipelines with operators in this fragment.
+ auto root_pipeline = add_pipeline();
+
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines(_runtime_state->obj_pool(),
request,
+
*_query_ctx->desc_tbl, &_root_op,
+ root_pipeline));
+
+ // 3. Create sink operator
+ if (!request.fragment.__isset.output_sink) {
+ return Status::InternalError("No output sink in this fragment!");
+ }
+ RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink(
+ _runtime_state->obj_pool(), request.fragment.output_sink,
+ request.fragment.output_exprs, request,
root_pipeline->output_row_desc(),
+ _runtime_state.get(), *_desc_tbl, root_pipeline->id()));
+ RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
+ RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
+
+ for (PipelinePtr& pipeline : _pipelines) {
+ DCHECK(pipeline->sink_x() != nullptr) <<
pipeline->operator_xs().size();
+
RETURN_IF_ERROR(pipeline->sink_x()->set_child(pipeline->operator_xs().back()));
+ }
}
if (_enable_local_shuffle()) {
+ SCOPED_TIMER(_plan_local_shuffle_timer);
RETURN_IF_ERROR(_plan_local_exchange(request.num_buckets,
request.bucket_seq_to_instance_idx,
request.shuffle_idx_to_instance_idx));
@@ -285,13 +298,15 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
// 4. Initialize global states in pipelines.
for (PipelinePtr& pipeline : _pipelines) {
+ SCOPED_TIMER(_prepare_all_pipelines_timer);
pipeline->children().clear();
RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
}
-
- // 5. Build pipeline tasks and initialize local state.
- RETURN_IF_ERROR(_build_pipeline_x_tasks(request, thread_pool));
-
+ {
+ // 5. Build pipeline tasks and initialize local state.
+ SCOPED_TIMER(_build_tasks_timer);
+ RETURN_IF_ERROR(_build_pipeline_x_tasks(request, thread_pool));
+ }
_init_next_report_time();
_prepared = true;
@@ -1568,10 +1583,10 @@ Status PipelineXFragmentContext::send_report(bool done)
{
}
}
return _report_status_cb(
- {true, exec_status, runtime_states, nullptr,
_runtime_state->load_channel_profile(),
- done || !exec_status.ok(), _query_ctx->coord_addr, _query_id,
_fragment_id,
- TUniqueId(), _backend_num, _runtime_state.get(),
- [this](Status st) { return update_status(st); },
+ {true, exec_status, runtime_states, _runtime_profile.get(),
+ _runtime_state->load_channel_profile(), done || !exec_status.ok(),
+ _query_ctx->coord_addr, _query_id, _fragment_id, TUniqueId(),
_backend_num,
+ _runtime_state.get(), [this](Status st) { return
update_status(st); },
[this](const PPlanFragmentCancelReason& reason, const
std::string& msg) {
cancel(reason, msg);
}},
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 60ac22a429e..c3e3f5596ad 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -242,6 +242,12 @@ private:
int _total_instances = -1;
bool _require_bucket_distribution = false;
+
+ RuntimeProfile::Counter* _init_context_timer = nullptr;
+ RuntimeProfile::Counter* _build_pipelines_timer = nullptr;
+ RuntimeProfile::Counter* _plan_local_shuffle_timer = nullptr;
+ RuntimeProfile::Counter* _prepare_all_pipelines_timer = nullptr;
+ RuntimeProfile::Counter* _build_tasks_timer = nullptr;
};
} // namespace pipeline
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 334b413b44b..f43190ebb36 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -359,6 +359,14 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
}
if (enable_profile) {
+ DCHECK(req.profile != nullptr);
+ TDetailedReportParams detailed_param;
+ detailed_param.__isset.fragment_instance_id = false;
+ detailed_param.__isset.profile = true;
+ detailed_param.__isset.loadChannelProfile = false;
+ detailed_param.__set_is_fragment_level(true);
+ req.profile->to_thrift(&detailed_param.profile);
+ params.detailed_report.push_back(detailed_param);
for (auto& pipeline_profile :
req.runtime_state->pipeline_id_to_profile()) {
TDetailedReportParams detailed_param;
detailed_param.__isset.fragment_instance_id = false;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index d1db6ff43e2..054996d6fb1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -260,9 +260,10 @@ public class ExecutionProfile {
if (isPipelineXProfile) {
int pipelineIdx = 0;
List<RuntimeProfile> taskProfile = Lists.newArrayList();
+ String suffix = " (host=" + backend.getHeartbeatAddress() + ")";
for (TDetailedReportParams param : params.detailed_report) {
- String name = "Pipeline :" + pipelineIdx + " "
- + " (host=" + backend.getHeartbeatAddress() + ")";
+ String name = param.isSetIsFragmentLevel() &&
param.is_fragment_level ? "Fragment Level Profile: "
+ + suffix : "Pipeline :" + pipelineIdx + " " + suffix;
RuntimeProfile profile = new RuntimeProfile(name);
taskProfile.add(profile);
if (param.isSetProfile()) {
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index f8edb5d544b..90a75d5ba94 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -396,6 +396,7 @@ struct TDetailedReportParams {
1: optional Types.TUniqueId fragment_instance_id
2: optional RuntimeProfile.TRuntimeProfileTree profile
3: optional RuntimeProfile.TRuntimeProfileTree loadChannelProfile
+ 4: optional bool is_fragment_level
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]