yiguolei commented on code in PR #33015:
URL: https://github.com/apache/doris/pull/33015#discussion_r1544141992
##########
be/src/runtime/query_context.cpp:
##########
@@ -281,4 +293,77 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr&
tg) {
return Status::OK();
}
+void QueryContext::async_report_profile_x() {
+ if (!enable_pipeline_x_exec()) {
+ return;
+ }
+
+ std::lock_guard<std::mutex> lg(_profile_mutex);
+ LOG_INFO(
+ "Pipeline x query context, register query profile, query {},
fragment profile count {}",
+ print_id(_query_id), _profile_map_x.size());
+
+ for (auto& [fid, f_profile] : _profile_map_x) {
+ auto tmp_f_profile = std::make_shared<profile::FragmentProfileX>();
+
+ for (auto p_profile : f_profile.second) {
+ tmp_f_profile->pipeline_profiles.push_back(
+ std::make_shared<profile::PipelineProfileX>(fid,
f_profile.first, p_profile));
+ }
+
+
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_fragment_profile_x(
+ _query_id, fid, this->coord_addr, tmp_f_profile);
+ }
+
+ _profile_map_x.clear();
+}
+
+void QueryContext::add_pipeline_profile_x(int f_id, bool finished,
+ profile::TRuntimeProfilePtr profile)
{
+ std::lock_guard<std::mutex> l(_profile_mutex);
+ LOG_INFO("Query X {} add pipeline profile, fid {}",
print_id(this->_query_id), f_id);
+ _profile_map_x[f_id].first = finished;
+ _profile_map_x[f_id].second.push_back(profile);
+}
+
+void QueryContext::add_fragment_profile_x(
+ int f_id, bool finished, const
std::vector<profile::TRuntimeProfilePtr>& pipeline_profile) {
+ LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline
profile count {} ",
+ print_id(this->_query_id), f_id, pipeline_profile.size());
+
+ std::lock_guard<std::mutex> l(_profile_mutex);
+ _profile_map_x[f_id] = std::make_pair(finished, pipeline_profile);
+}
+
+void QueryContext::add_instance_profile(const TUniqueId& iid, bool finished,
+ profile::TRuntimeProfilePtr profile) {
+ // LOG_INFO("Query {} add instance profile, iid {}, finished {}",
print_id(this->_query_id),
+ // print_id(iid), finished);
+ DCHECK(profile != nullptr) << print_id(iid);
+ std::lock_guard<std::mutex> lg(_profile_mutex);
+ _profile_map[print_id(iid)] =
+ std::make_pair(finished,
std::make_shared<profile::InstanceProfile>(iid, profile));
+}
Review Comment:
PlanFragmentExecutor
PipelineXFragmentContext
Pipeline...
collect_profile ---> vector<fragmentid/ instanceid TRuntimeProfile>
QueryContext
std::vector<std::weak_ptr<PipelineXFragmentContext>>
std::vector<std::weak_ptr<PipelineXFragmentContext>>
std::vector<std::weak_ptr<PipelineXFragmentContext>>
map<> // 析构的时候,给我的
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]