This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 454cdffefce [profile](pipeline) Add key metrics for pipeline
initialization (#35073)
454cdffefce is described below
commit 454cdffefce1d2854e1d970468c3b27b4878da8d
Author: Gabriel <[email protected]>
AuthorDate: Wed May 22 09:29:47 2024 +0800
[profile](pipeline) Add key metrics for pipeline initialization (#35073)
---
be/src/pipeline/pipeline_fragment_context.cpp | 162 ++++++++++++---------
be/src/pipeline/pipeline_fragment_context.h | 5 +
be/src/runtime/fragment_mgr.cpp | 8 +
.../doris/common/profile/ExecutionProfile.java | 5 +-
gensrc/thrift/FrontendService.thrift | 1 +
5 files changed, 107 insertions(+), 74 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index c1c939ca88c..54ab15041b0 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -221,89 +221,103 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
if (_prepared) {
return Status::InternalError("Already prepared");
}
- _num_instances = request.local_params.size();
- _total_instances = request.__isset.total_instances ?
request.total_instances : _num_instances;
_runtime_profile = std::make_unique<RuntimeProfile>("PipelineContext");
_prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime");
SCOPED_TIMER(_prepare_timer);
+ _build_pipelines_timer = ADD_TIMER(_runtime_profile, "BuildPipelinesTime");
+ _init_context_timer = ADD_TIMER(_runtime_profile, "InitContextTime");
+ _plan_local_shuffle_timer = ADD_TIMER(_runtime_profile,
"PlanLocalShuffleTime");
+ _build_tasks_timer = ADD_TIMER(_runtime_profile, "BuildTasksTime");
+ _prepare_all_pipelines_timer = ADD_TIMER(_runtime_profile,
"PrepareAllPipelinesTime");
+ {
+ SCOPED_TIMER(_init_context_timer);
+ _num_instances = request.local_params.size();
+ _total_instances =
+ request.__isset.total_instances ? request.total_instances :
_num_instances;
- auto* fragment_context = this;
-
- LOG_INFO("PipelineFragmentContext::prepare")
- .tag("query_id", print_id(_query_id))
- .tag("fragment_id", _fragment_id)
- .tag("pthread_id", (uintptr_t)pthread_self());
-
- if (request.query_options.__isset.is_report_success) {
-
fragment_context->set_is_report_success(request.query_options.is_report_success);
- }
+ auto* fragment_context = this;
- // 1. Set up the global runtime state.
- _runtime_state = RuntimeState::create_unique(request.query_id,
request.fragment_id,
- request.query_options,
_query_ctx->query_globals,
- _exec_env, _query_ctx.get());
+ LOG_INFO("PipelineFragmentContext::prepare")
+ .tag("query_id", print_id(_query_id))
+ .tag("fragment_id", _fragment_id)
+ .tag("pthread_id", (uintptr_t)pthread_self());
-
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
- if (request.__isset.backend_id) {
- _runtime_state->set_backend_id(request.backend_id);
- }
- if (request.__isset.import_label) {
- _runtime_state->set_import_label(request.import_label);
- }
- if (request.__isset.db_name) {
- _runtime_state->set_db_name(request.db_name);
- }
- if (request.__isset.load_job_id) {
- _runtime_state->set_load_job_id(request.load_job_id);
- }
+ if (request.query_options.__isset.is_report_success) {
+
fragment_context->set_is_report_success(request.query_options.is_report_success);
+ }
- if (request.is_simplified_param) {
- _desc_tbl = _query_ctx->desc_tbl;
- } else {
- DCHECK(request.__isset.desc_tbl);
- RETURN_IF_ERROR(
- DescriptorTbl::create(_runtime_state->obj_pool(),
request.desc_tbl, &_desc_tbl));
- }
- _runtime_state->set_desc_tbl(_desc_tbl);
- _runtime_state->set_num_per_fragment_instances(request.num_senders);
- _runtime_state->set_load_stream_per_node(request.load_stream_per_node);
- _runtime_state->set_total_load_streams(request.total_load_streams);
- _runtime_state->set_num_local_sink(request.num_local_sink);
-
- const auto& local_params = request.local_params[0];
- if (local_params.__isset.runtime_filter_params) {
- _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
- local_params.runtime_filter_params);
- }
- if (local_params.__isset.topn_filter_source_node_ids) {
-
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
- } else {
- _query_ctx->init_runtime_predicates({0});
- }
+ // 1. Set up the global runtime state.
+ _runtime_state = RuntimeState::create_unique(
+ request.query_id, request.fragment_id, request.query_options,
+ _query_ctx->query_globals, _exec_env, _query_ctx.get());
- _need_local_merge = request.__isset.parallel_instances;
+
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
+ if (request.__isset.backend_id) {
+ _runtime_state->set_backend_id(request.backend_id);
+ }
+ if (request.__isset.import_label) {
+ _runtime_state->set_import_label(request.import_label);
+ }
+ if (request.__isset.db_name) {
+ _runtime_state->set_db_name(request.db_name);
+ }
+ if (request.__isset.load_job_id) {
+ _runtime_state->set_load_job_id(request.load_job_id);
+ }
- // 2. Build pipelines with operators in this fragment.
- auto root_pipeline = add_pipeline();
- RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines(
- _runtime_state->obj_pool(), request, *_query_ctx->desc_tbl,
&_root_op, root_pipeline));
+ if (request.is_simplified_param) {
+ _desc_tbl = _query_ctx->desc_tbl;
+ } else {
+ DCHECK(request.__isset.desc_tbl);
+ RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(),
request.desc_tbl,
+ &_desc_tbl));
+ }
+ _runtime_state->set_desc_tbl(_desc_tbl);
+ _runtime_state->set_num_per_fragment_instances(request.num_senders);
+ _runtime_state->set_load_stream_per_node(request.load_stream_per_node);
+ _runtime_state->set_total_load_streams(request.total_load_streams);
+ _runtime_state->set_num_local_sink(request.num_local_sink);
+
+ const auto& local_params = request.local_params[0];
+ if (local_params.__isset.runtime_filter_params) {
+ _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
+ local_params.runtime_filter_params);
+ }
+ if (local_params.__isset.topn_filter_source_node_ids) {
+
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
+ } else {
+ _query_ctx->init_runtime_predicates({0});
+ }
- // 3. Create sink operator
- if (!request.fragment.__isset.output_sink) {
- return Status::InternalError("No output sink in this fragment!");
+ _need_local_merge = request.__isset.parallel_instances;
}
- RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink(
- _runtime_state->obj_pool(), request.fragment.output_sink,
request.fragment.output_exprs,
- request, root_pipeline->output_row_desc(), _runtime_state.get(),
*_desc_tbl,
- root_pipeline->id()));
- RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
- RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
- for (PipelinePtr& pipeline : _pipelines) {
- DCHECK(pipeline->sink_x() != nullptr) <<
pipeline->operator_xs().size();
-
RETURN_IF_ERROR(pipeline->sink_x()->set_child(pipeline->operator_xs().back()));
+ {
+ SCOPED_TIMER(_build_pipelines_timer);
+ // 2. Build pipelines with operators in this fragment.
+ auto root_pipeline = add_pipeline();
+
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines(_runtime_state->obj_pool(),
request,
+
*_query_ctx->desc_tbl, &_root_op,
+ root_pipeline));
+
+ // 3. Create sink operator
+ if (!request.fragment.__isset.output_sink) {
+ return Status::InternalError("No output sink in this fragment!");
+ }
+ RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink(
+ _runtime_state->obj_pool(), request.fragment.output_sink,
+ request.fragment.output_exprs, request,
root_pipeline->output_row_desc(),
+ _runtime_state.get(), *_desc_tbl, root_pipeline->id()));
+ RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
+ RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
+
+ for (PipelinePtr& pipeline : _pipelines) {
+ DCHECK(pipeline->sink_x() != nullptr) <<
pipeline->operator_xs().size();
+
RETURN_IF_ERROR(pipeline->sink_x()->set_child(pipeline->operator_xs().back()));
+ }
}
if (_enable_local_shuffle()) {
+ SCOPED_TIMER(_plan_local_shuffle_timer);
RETURN_IF_ERROR(_plan_local_exchange(request.num_buckets,
request.bucket_seq_to_instance_idx,
request.shuffle_idx_to_instance_idx));
@@ -311,12 +325,16 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
// 4. Initialize global states in pipelines.
for (PipelinePtr& pipeline : _pipelines) {
+ SCOPED_TIMER(_prepare_all_pipelines_timer);
pipeline->children().clear();
RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
}
- // 5. Build pipeline tasks and initialize local state.
- RETURN_IF_ERROR(_build_pipeline_tasks(request));
+ {
+ SCOPED_TIMER(_build_tasks_timer);
+ // 5. Build pipeline tasks and initialize local state.
+ RETURN_IF_ERROR(_build_pipeline_tasks(request));
+ }
_init_next_report_time();
@@ -1560,7 +1578,7 @@ Status PipelineFragmentContext::send_report(bool done) {
ReportStatusRequest req {true,
exec_status,
runtime_states,
- nullptr,
+ _runtime_profile.get(),
_runtime_state->load_channel_profile(),
done || !exec_status.ok(),
_query_ctx->coord_addr,
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index f4e324b6f53..1044282289b 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -218,6 +218,11 @@ private:
MonotonicStopWatch _fragment_watcher;
RuntimeProfile::Counter* _prepare_timer = nullptr;
+ RuntimeProfile::Counter* _init_context_timer = nullptr;
+ RuntimeProfile::Counter* _build_pipelines_timer = nullptr;
+ RuntimeProfile::Counter* _plan_local_shuffle_timer = nullptr;
+ RuntimeProfile::Counter* _prepare_all_pipelines_timer = nullptr;
+ RuntimeProfile::Counter* _build_tasks_timer = nullptr;
std::function<void(RuntimeState*, Status*)> _call_back;
bool _is_fragment_instance_closed = false;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 959b85bb537..40013fb33dd 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -267,6 +267,14 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
}
if (enable_profile) {
+ DCHECK(req.profile != nullptr);
+ TDetailedReportParams detailed_param;
+ detailed_param.__isset.fragment_instance_id = false;
+ detailed_param.__isset.profile = true;
+ detailed_param.__isset.loadChannelProfile = false;
+ detailed_param.__set_is_fragment_level(true);
+ req.profile->to_thrift(&detailed_param.profile);
+ params.detailed_report.push_back(detailed_param);
for (auto pipeline_profile :
req.runtime_state->pipeline_id_to_profile()) {
TDetailedReportParams detailed_param;
detailed_param.__isset.fragment_instance_id = false;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index dc91210eeb7..76420045be6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -340,9 +340,10 @@ public class ExecutionProfile {
if (isPipelineXProfile) {
int pipelineIdx = 0;
List<RuntimeProfile> taskProfile = Lists.newArrayList();
+ String suffix = " (host=" + backend.getHeartbeatAddress() + ")";
for (TDetailedReportParams param : params.detailed_report) {
- String name = "Pipeline :" + pipelineIdx + " "
- + " (host=" + backend.getHeartbeatAddress() + ")";
+ String name = param.isSetIsFragmentLevel() &&
param.is_fragment_level ? "Fragment Level Profile: "
+ + suffix : "Pipeline :" + pipelineIdx + " " + suffix;
RuntimeProfile profile = new RuntimeProfile(name);
taskProfile.add(profile);
if (param.isSetProfile()) {
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index bd095db9dbc..aea1fe7604e 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -395,6 +395,7 @@ struct TDetailedReportParams {
1: optional Types.TUniqueId fragment_instance_id
2: optional RuntimeProfile.TRuntimeProfileTree profile
3: optional RuntimeProfile.TRuntimeProfileTree loadChannelProfile
+ 4: optional bool is_fragment_level
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]