This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 174eea7246e (chore)[profile] remove non-pipeline logical from profile
(#35877)
174eea7246e is described below
commit 174eea7246e13ae44f966462b9e93a8c6be5df61
Author: zhiqiang <[email protected]>
AuthorDate: Wed Jun 5 09:44:54 2024 +0800
(chore)[profile] remove non-pipeline logical from profile (#35877)
Remove the code for non-pipeline profile processing.
---
be/src/pipeline/pipeline_fragment_context.cpp | 4 +-
be/src/runtime/fragment_mgr.cpp | 2 +-
be/src/runtime/query_context.cpp | 69 ++----
be/src/runtime/query_context.h | 26 +--
be/src/runtime/runtime_query_statistics_mgr.cpp | 155 +------------
be/src/runtime/runtime_query_statistics_mgr.h | 42 +---
.../doris/common/profile/ExecutionProfile.java | 250 ++++++---------------
.../org/apache/doris/common/profile/Profile.java | 3 -
.../main/java/org/apache/doris/qe/Coordinator.java | 15 +-
9 files changed, 109 insertions(+), 457 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index a9db08d0d70..9ddbd1b9150 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1557,8 +1557,8 @@ void PipelineFragmentContext::_close_fragment_instance() {
}
if (_query_ctx->enable_profile()) {
- _query_ctx->add_fragment_profile_x(_fragment_id,
collect_realtime_profile_x(),
-
collect_realtime_load_channel_profile_x());
+ _query_ctx->add_fragment_profile(_fragment_id,
collect_realtime_profile_x(),
+
collect_realtime_load_channel_profile_x());
}
// all submitted tasks done
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index fc9fbc9764a..bb3ef4ff0f8 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1261,7 +1261,7 @@ Status FragmentMgr::get_realtime_exec_status(const
TUniqueId& query_id,
}
if (query_context->enable_pipeline_x_exec()) {
- *exec_status = query_context->get_realtime_exec_status_x();
+ *exec_status = query_context->get_realtime_exec_status();
}
return Status::OK();
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index dcc74c40e1c..a8efd4d9392 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -353,7 +353,7 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr&
tg) {
return Status::OK();
}
-void QueryContext::add_fragment_profile_x(
+void QueryContext::add_fragment_profile(
int fragment_id, const
std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profiles,
std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
if (pipeline_profiles.empty()) {
@@ -375,70 +375,27 @@ void QueryContext::add_fragment_profile_x(
LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline
profile count {} ",
print_id(this->_query_id), fragment_id, pipeline_profiles.size());
- _profile_map_x.insert(std::make_pair(fragment_id, pipeline_profiles));
+ _profile_map.insert(std::make_pair(fragment_id, pipeline_profiles));
if (load_channel_profile != nullptr) {
- _load_channel_profile_map_x.insert(std::make_pair(fragment_id,
load_channel_profile));
- }
-}
-
-void QueryContext::add_instance_profile(const TUniqueId& instance_id,
- std::shared_ptr<TRuntimeProfileTree>
profile,
- std::shared_ptr<TRuntimeProfileTree>
load_channel_profile) {
- DCHECK(profile != nullptr) << print_id(instance_id);
-
- std::lock_guard<std::mutex> lg(_profile_mutex);
- _profile_map.insert(std::make_pair(instance_id, profile));
- if (load_channel_profile != nullptr) {
- _load_channel_profile_map.insert(std::make_pair(instance_id,
load_channel_profile));
+ _load_channel_profile_map.insert(std::make_pair(fragment_id,
load_channel_profile));
}
}
void QueryContext::_report_query_profile() {
- _report_query_profile_x();
- _report_query_profile_non_pipeline();
-}
-
-void QueryContext::_report_query_profile_non_pipeline() {
- if (enable_pipeline_x_exec()) {
- return;
- }
-
- std::lock_guard<std::mutex> lg(_profile_mutex);
- LOG_INFO("Query {}, register query profile, instance profile count {}",
print_id(_query_id),
- _profile_map.size());
-
- for (auto& [instance_id, instance_profile] : _profile_map) {
- std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr;
- if (_load_channel_profile_map.contains(instance_id)) {
- load_channel_profile = _load_channel_profile_map[instance_id];
- }
-
-
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_instance_profile(
- _query_id, this->coord_addr, instance_id, instance_profile,
load_channel_profile);
- }
-
-
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_report_profile();
-}
-
-void QueryContext::_report_query_profile_x() {
- if (!enable_pipeline_x_exec()) {
- return;
- }
-
std::lock_guard<std::mutex> lg(_profile_mutex);
LOG_INFO(
"Pipeline x query context, register query profile, query {},
fragment profile count {}",
- print_id(_query_id), _profile_map_x.size());
+ print_id(_query_id), _profile_map.size());
- for (auto& [fragment_id, fragment_profile] : _profile_map_x) {
+ for (auto& [fragment_id, fragment_profile] : _profile_map) {
std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr;
- if (_load_channel_profile_map_x.contains(fragment_id)) {
- load_channel_profile = _load_channel_profile_map_x[fragment_id];
+ if (_load_channel_profile_map.contains(fragment_id)) {
+ load_channel_profile = _load_channel_profile_map[fragment_id];
}
-
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_fragment_profile_x(
+
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_fragment_profile(
_query_id, this->coord_addr, fragment_id, fragment_profile,
load_channel_profile);
}
@@ -446,7 +403,7 @@ void QueryContext::_report_query_profile_x() {
}
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
-QueryContext::_collect_realtime_query_profile_x() const {
+QueryContext::_collect_realtime_query_profile() const {
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
res;
if (!enable_pipeline_x_exec()) {
@@ -482,20 +439,20 @@ QueryContext::_collect_realtime_query_profile_x() const {
return res;
}
-TReportExecStatusParams QueryContext::get_realtime_exec_status_x() const {
+TReportExecStatusParams QueryContext::get_realtime_exec_status() const {
TReportExecStatusParams exec_status;
if (enable_pipeline_x_exec()) {
- auto realtime_query_profile = _collect_realtime_query_profile_x();
+ auto realtime_query_profile = _collect_realtime_query_profile();
std::vector<std::shared_ptr<TRuntimeProfileTree>>
load_channel_profiles;
- for (auto load_channel_profile : _load_channel_profile_map_x) {
+ for (auto load_channel_profile : _load_channel_profile_map) {
if (load_channel_profile.second != nullptr) {
load_channel_profiles.push_back(load_channel_profile.second);
}
}
- exec_status =
RuntimeQueryStatiticsMgr::create_report_exec_status_params_x(
+ exec_status =
RuntimeQueryStatiticsMgr::create_report_exec_status_params(
this->_query_id, std::move(realtime_query_profile),
std::move(load_channel_profiles), /*is_done=*/false);
} else {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 318afd69187..dc7ea7e29bf 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -322,7 +322,7 @@ private:
std::mutex _profile_mutex;
- // when fragment of pipeline x is closed, it will register its profile to
this map by using add_fragment_profile_x
+ // when fragment of pipeline is closed, it will register its profile to
this map by using add_fragment_profile
// flatten profile of one fragment:
// Pipeline 0
// PipelineTask 0
@@ -339,34 +339,22 @@ private:
// PipelineTask 3
// Operator 3
// fragment_id -> list<profile>
- std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
_profile_map_x;
- std::unordered_map<int, std::shared_ptr<TRuntimeProfileTree>>
_load_channel_profile_map_x;
-
- // instance_id -> profile
- std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>
_profile_map;
- std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>
_load_channel_profile_map;
+ std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
_profile_map;
+ std::unordered_map<int, std::shared_ptr<TRuntimeProfileTree>>
_load_channel_profile_map;
void _report_query_profile();
- void _report_query_profile_non_pipeline();
- void _report_query_profile_x();
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
- _collect_realtime_query_profile_x() const;
-
- std::unordered_map<TUniqueId,
std::vector<std::shared_ptr<TRuntimeProfileTree>>>
- _collect_realtime_query_profile_non_pipeline() const;
+ _collect_realtime_query_profile() const;
public:
- // when fragment of pipeline x is closed, it will register its profile to
this map by using add_fragment_profile_x
- void add_fragment_profile_x(
+ // when fragment of pipeline is closed, it will register its profile to
this map by using add_fragment_profile
+ void add_fragment_profile(
int fragment_id,
const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
pipeline_profile,
std::shared_ptr<TRuntimeProfileTree> load_channel_profile);
- void add_instance_profile(const TUniqueId& iid,
std::shared_ptr<TRuntimeProfileTree> profile,
- std::shared_ptr<TRuntimeProfileTree>
load_channel_profile);
-
- TReportExecStatusParams get_realtime_exec_status_x() const;
+ TReportExecStatusParams get_realtime_exec_status() const;
bool enable_profile() const {
return _query_options.__isset.enable_profile &&
_query_options.enable_profile;
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 3e3dd3de2dd..dda41936284 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -101,7 +101,7 @@ static Status _do_report_exec_stats_rpc(const
TNetworkAddress& coor_addr,
return Status::OK();
}
-TReportExecStatusParams
RuntimeQueryStatiticsMgr::create_report_exec_status_params_x(
+TReportExecStatusParams
RuntimeQueryStatiticsMgr::create_report_exec_status_params(
const TUniqueId& query_id,
std::unordered_map<int32,
std::vector<std::shared_ptr<TRuntimeProfileTree>>>
fragment_id_to_profile,
@@ -169,47 +169,6 @@ TReportExecStatusParams
RuntimeQueryStatiticsMgr::create_report_exec_status_para
return req;
}
-TReportExecStatusParams
RuntimeQueryStatiticsMgr::create_report_exec_status_params_non_pipeline(
- const TUniqueId& query_id,
- const std::unordered_map<TUniqueId,
std::shared_ptr<TRuntimeProfileTree>>&
- instance_id_to_profile,
- const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
load_channel_profile,
- bool is_done) {
- TQueryProfile profile;
- std::vector<TUniqueId> fragment_instance_ids;
- std::vector<TRuntimeProfileTree> instance_profiles;
- std::vector<TRuntimeProfileTree> load_channel_profiles;
-
- for (auto entry : instance_id_to_profile) {
- TUniqueId instance_id = entry.first;
- std::shared_ptr<TRuntimeProfileTree> profile = entry.second;
-
- if (profile == nullptr) {
- auto msg = fmt::format("Register instance profile {} {} failed,
profile is null",
- print_id(query_id), print_id(instance_id));
- DCHECK(false) << msg;
- LOG_ERROR(msg);
- continue;
- }
-
- fragment_instance_ids.push_back(instance_id);
- instance_profiles.push_back(*profile);
- }
-
- profile.__set_query_id(query_id);
- profile.__set_fragment_instance_ids(fragment_instance_ids);
- profile.__set_instance_profiles(instance_profiles);
- profile.__set_load_channel_profiles(load_channel_profiles);
-
- TReportExecStatusParams res;
- res.__set_query_profile(profile);
- // invalid query id to avoid API compatibility during upgrade
- res.__set_query_id(TUniqueId());
- res.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id);
- res.__set_done(is_done);
- return res;
-}
-
void RuntimeQueryStatiticsMgr::start_report_thread() {
if (started.load()) {
DCHECK(false) << "report thread has been started";
@@ -230,8 +189,7 @@ void
RuntimeQueryStatiticsMgr::report_query_profiles_thread() {
{
std::unique_lock<std::mutex> lock(_report_profile_mutex);
- while (_query_profile_map.empty() && _profile_map_x.empty() &&
- !_report_profile_thread_stop) {
+ while (_profile_map.empty() && !_report_profile_thread_stop) {
_report_profile_cv.wait_for(lock, std::chrono::seconds(3));
}
}
@@ -273,37 +231,7 @@ void RuntimeQueryStatiticsMgr::stop_report_thread() {
LOG_INFO("All report threads stopped");
}
-void RuntimeQueryStatiticsMgr::register_instance_profile(
- const TUniqueId& query_id, const TNetworkAddress& coor_addr, const
TUniqueId& instance_id,
- std::shared_ptr<TRuntimeProfileTree> instance_profile,
- std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
- if (instance_profile == nullptr) {
- auto msg = fmt::format("Register instance profile {} {} failed,
profile is null",
- print_id(query_id), print_id(instance_id));
- DCHECK(false) << msg;
- LOG_ERROR(msg);
- return;
- }
-
- std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
-
- if (!_query_profile_map.contains(query_id)) {
- _query_profile_map[query_id] = std::make_tuple(
- coor_addr, std::unordered_map<TUniqueId,
std::shared_ptr<TRuntimeProfileTree>>());
- }
-
- std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>&
instance_profile_map =
- std::get<1>(_query_profile_map[query_id]);
- instance_profile_map.insert(std::make_pair(instance_id, instance_profile));
-
- if (load_channel_profile != nullptr) {
- _load_channel_profile_map[instance_id] = load_channel_profile;
- }
-
- LOG_INFO("Register instance profile {} {}", print_id(query_id),
print_id(instance_id));
-}
-
-void RuntimeQueryStatiticsMgr::register_fragment_profile_x(
+void RuntimeQueryStatiticsMgr::register_fragment_profile(
const TUniqueId& query_id, const TNetworkAddress& coor_addr, int32_t
fragment_id,
std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles,
std::shared_ptr<TRuntimeProfileTree> load_channel_profile_x) {
@@ -319,95 +247,34 @@ void
RuntimeQueryStatiticsMgr::register_fragment_profile_x(
std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
- if (!_profile_map_x.contains(query_id)) {
- _profile_map_x[query_id] = std::make_tuple(
+ if (!_profile_map.contains(query_id)) {
+ _profile_map[query_id] = std::make_tuple(
coor_addr,
std::unordered_map<int,
std::vector<std::shared_ptr<TRuntimeProfileTree>>>());
}
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>&
- fragment_profile_map = std::get<1>(_profile_map_x[query_id]);
+ fragment_profile_map = std::get<1>(_profile_map[query_id]);
fragment_profile_map.insert(std::make_pair(fragment_id, p_profiles));
if (load_channel_profile_x != nullptr) {
- _load_channel_profile_map_x[std::make_pair(query_id, fragment_id)] =
load_channel_profile_x;
+ _load_channel_profile_map[std::make_pair(query_id, fragment_id)] =
load_channel_profile_x;
}
LOG_INFO("register x profile done {}, fragment {}, profiles {}",
print_id(query_id),
fragment_id, p_profiles.size());
}
-void RuntimeQueryStatiticsMgr::_report_query_profiles_non_pipeline() {
- // query_id -> {coordinator_addr, {instance_id -> instance_profile}}
- decltype(_query_profile_map) profile_copy;
+void RuntimeQueryStatiticsMgr::_report_query_profiles_function() {
+ decltype(_profile_map) profile_copy;
decltype(_load_channel_profile_map) load_channel_profile_copy;
{
std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
- _query_profile_map.swap(profile_copy);
+ _profile_map.swap(profile_copy);
_load_channel_profile_map.swap(load_channel_profile_copy);
}
- // query_id -> {coordinator_addr, {instance_id -> instance_profile}}
- for (const auto& entry : profile_copy) {
- const auto& query_id = entry.first;
- const auto& coor_addr = std::get<0>(entry.second);
- const std::unordered_map<TUniqueId,
std::shared_ptr<TRuntimeProfileTree>>&
- instance_id_to_profile = std::get<1>(entry.second);
-
- for (const auto& profile_entry : instance_id_to_profile) {
- const auto& instance_id = profile_entry.first;
- const auto& instance_profile = profile_entry.second;
-
- if (instance_profile == nullptr) {
- auto msg = fmt::format("Query {} instance {} profile is null",
print_id(query_id),
- print_id(instance_id));
- DCHECK(false) << msg;
- LOG_ERROR(msg);
- continue;
- }
- }
-
- std::vector<std::shared_ptr<TRuntimeProfileTree>>
load_channel_profiles;
- for (const auto& load_channel_profile : load_channel_profile_copy) {
- if (load_channel_profile.second == nullptr) {
- auto msg = fmt::format(
- "Register fragment profile {} {} failed, load channel
profile is null",
- print_id(query_id), -1);
- DCHECK(false) << msg;
- LOG_ERROR(msg);
- continue;
- }
-
- load_channel_profiles.push_back(load_channel_profile.second);
- }
-
- TReportExecStatusParams req =
create_report_exec_status_params_non_pipeline(
- query_id, instance_id_to_profile, load_channel_profiles,
/*is_done=*/true);
- TReportExecStatusResult res;
- auto rpc_status = _do_report_exec_stats_rpc(coor_addr, req, res);
-
- if (res.status.status_code != TStatusCode::OK) {
- std::stringstream ss;
- res.status.printTo(ss);
- LOG_WARNING("Query {} send profile to {} failed, msg: {}",
print_id(query_id),
- PrintThriftNetworkAddress(coor_addr), ss.str());
- } else {
- LOG_INFO("Send {} profile finished", print_id(query_id));
- }
- }
-}
-
-void RuntimeQueryStatiticsMgr::_report_query_profiles_x() {
- decltype(_profile_map_x) profile_copy;
- decltype(_load_channel_profile_map_x) load_channel_profile_copy;
-
- {
- std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
- _profile_map_x.swap(profile_copy);
- _load_channel_profile_map_x.swap(load_channel_profile_copy);
- }
-
// query_id -> {coordinator_addr, {fragment_id ->
std::vectpr<pipeline_profile>}}
for (auto& entry : profile_copy) {
const auto& query_id = entry.first;
@@ -435,7 +302,7 @@ void RuntimeQueryStatiticsMgr::_report_query_profiles_x() {
load_channel_profiles.push_back(load_channel_profile.second);
}
- TReportExecStatusParams req = create_report_exec_status_params_x(
+ TReportExecStatusParams req = create_report_exec_status_params(
query_id, std::move(fragment_profile_map),
std::move(load_channel_profiles),
/*is_done=*/true);
TReportExecStatusResult res;
diff --git a/be/src/runtime/runtime_query_statistics_mgr.h
b/be/src/runtime/runtime_query_statistics_mgr.h
index ff61f665342..5fb2332c335 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.h
+++ b/be/src/runtime/runtime_query_statistics_mgr.h
@@ -69,19 +69,12 @@ public:
RuntimeQueryStatiticsMgr() = default;
~RuntimeQueryStatiticsMgr() = default;
- static TReportExecStatusParams create_report_exec_status_params_x(
+ static TReportExecStatusParams create_report_exec_status_params(
const TUniqueId& q_id,
std::unordered_map<int32,
std::vector<std::shared_ptr<TRuntimeProfileTree>>>
fragment_id_to_profile,
std::vector<std::shared_ptr<TRuntimeProfileTree>>
load_channel_profile, bool is_done);
- static TReportExecStatusParams
create_report_exec_status_params_non_pipeline(
- const TUniqueId& q_id,
- const std::unordered_map<TUniqueId,
std::shared_ptr<TRuntimeProfileTree>>&
- instance_id_to_profile,
- const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
load_channel_profile,
- bool is_done);
-
void register_query_statistics(std::string query_id,
std::shared_ptr<QueryStatistics> qs_ptr,
TNetworkAddress fe_addr, TQueryType::type
query_type);
@@ -105,15 +98,10 @@ public:
void trigger_report_profile();
void stop_report_thread();
- void register_instance_profile(const TUniqueId& query_id, const
TNetworkAddress& coor_addr,
- const TUniqueId& instance_id,
- std::shared_ptr<TRuntimeProfileTree>
instance_profile,
- std::shared_ptr<TRuntimeProfileTree>
load_channel_profile);
-
- void register_fragment_profile_x(const TUniqueId& query_id, const
TNetworkAddress& const_addr,
- int32_t fragment_id,
-
std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles,
- std::shared_ptr<TRuntimeProfileTree>
load_channel_profile_x);
+ void register_fragment_profile(const TUniqueId& query_id, const
TNetworkAddress& const_addr,
+ int32_t fragment_id,
+
std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles,
+ std::shared_ptr<TRuntimeProfileTree>
load_channel_profile_x);
private:
std::shared_mutex _qs_ctx_map_lock;
@@ -125,13 +113,7 @@ private:
std::condition_variable _report_profile_cv;
bool _report_profile_thread_stop = false;
- void _report_query_profiles_function() {
- _report_query_profiles_x();
- _report_query_profiles_non_pipeline();
- }
-
- void _report_query_profiles_x();
- void _report_query_profiles_non_pipeline();
+ void _report_query_profiles_function();
std::shared_mutex _query_profile_map_lock;
@@ -140,17 +122,9 @@ private:
TUniqueId,
std::tuple<TNetworkAddress,
std::unordered_map<int,
std::vector<std::shared_ptr<TRuntimeProfileTree>>>>>
- _profile_map_x;
+ _profile_map;
std::unordered_map<std::pair<TUniqueId, int32_t>,
std::shared_ptr<TRuntimeProfileTree>>
- _load_channel_profile_map_x;
-
- // query_id -> {coordinator_addr, {instance_id -> instance_profile}}
- std::unordered_map<
- TUniqueId,
- std::tuple<TNetworkAddress,
- std::unordered_map<TUniqueId,
std::shared_ptr<TRuntimeProfileTree>>>>
- _query_profile_map;
- std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>
_load_channel_profile_map;
+ _load_channel_profile_map;
};
} // namespace doris
\ No newline at end of file
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index e8b450b530c..9c08ab343d6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -78,7 +78,6 @@ public class ExecutionProfile {
private RuntimeProfile loadChannelProfile;
// FragmentId -> InstanceId -> RuntimeProfile
private Map<PlanFragmentId, Map<TUniqueId, RuntimeProfile>>
fragmentInstancesProfiles;
- private boolean isPipelineXProfile = false;
// use to merge profile from multi be
private Map<Integer, Map<TNetworkAddress, List<RuntimeProfile>>>
multiBeProfile = null;
@@ -138,7 +137,7 @@ public class ExecutionProfile {
return allPipelines;
}
- private RuntimeProfile getPipelineXAggregatedProfile(Map<Integer, String>
planNodeMap) {
+ private RuntimeProfile getPipelineAggregatedProfile(Map<Integer, String>
planNodeMap) {
RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
for (int i = 0; i < fragmentProfiles.size(); ++i) {
RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment "
+ i);
@@ -158,80 +157,34 @@ public class ExecutionProfile {
return fragmentsProfile;
}
- private RuntimeProfile getNonPipelineXAggregatedProfile(Map<Integer,
String> planNodeMap) {
- RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
- for (int i = 0; i < fragmentProfiles.size(); ++i) {
- RuntimeProfile oldFragmentProfile =
fragmentProfiles.get(seqNoToFragmentId.get(i));
- RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment "
+ i);
- fragmentsProfile.addChild(newFragmentProfile);
- List<RuntimeProfile> allInstanceProfiles = new
ArrayList<RuntimeProfile>();
- for (Pair<RuntimeProfile, Boolean> runtimeProfile :
oldFragmentProfile.getChildList()) {
- allInstanceProfiles.add(runtimeProfile.first);
- }
- RuntimeProfile mergedInstanceProfile = new
RuntimeProfile("Instance" + "(instance_num="
- + allInstanceProfiles.size() + ")",
allInstanceProfiles.get(0).nodeId());
- newFragmentProfile.addChild(mergedInstanceProfile);
- RuntimeProfile.mergeProfiles(allInstanceProfiles,
mergedInstanceProfile, planNodeMap);
- }
- return fragmentsProfile;
- }
-
public RuntimeProfile getAggregatedFragmentsProfile(Map<Integer, String>
planNodeMap) {
- if (isPipelineXProfile) {
- /*
- * Fragment 0
- * ---Pipeline 0
- * ------pipelineTask 0
- * ------pipelineTask 0
- * ------pipelineTask 0
- * ---Pipeline 1
- * ------pipelineTask 1
- * ---Pipeline 2
- * ------pipelineTask 2
- * ------pipelineTask 2
- * Fragment 1
- * ---Pipeline 0
- * ------......
- * ---Pipeline 1
- * ------......
- * ---Pipeline 2
- * ------......
- * ......
- */
- return getPipelineXAggregatedProfile(planNodeMap);
- } else {
- /*
- * Fragment 0
- * ---Instance 0
- * ------pipelineTask 0
- * ------pipelineTask 1
- * ------pipelineTask 2
- * ---Instance 1
- * ------pipelineTask 0
- * ------pipelineTask 1
- * ------pipelineTask 2
- * ---Instance 2
- * ------pipelineTask 0
- * ------pipelineTask 1
- * ------pipelineTask 2
- * Fragment 1
- * ---Instance 0
- * ---Instance 1
- * ---Instance 2
- * ......
- */
- return getNonPipelineXAggregatedProfile(planNodeMap);
- }
+ /*
+ * Fragment 0
+ * ---Pipeline 0
+ * ------pipelineTask 0
+ * ------pipelineTask 0
+ * ------pipelineTask 0
+ * ---Pipeline 1
+ * ------pipelineTask 1
+ * ---Pipeline 2
+ * ------pipelineTask 2
+ * ------pipelineTask 2
+ * Fragment 1
+ * ---Pipeline 0
+ * ------......
+ * ---Pipeline 1
+ * ------......
+ * ---Pipeline 2
+ * ------......
+ * ......
+ */
+ return getPipelineAggregatedProfile(planNodeMap);
}
public RuntimeProfile getRoot() {
return root;
}
- public void setPipelineX() {
- this.isPipelineXProfile = true;
- }
-
// The execution profile is maintained in ProfileManager, if it is
finished, then should
// remove it from it as soon as possible.
public void update(long startTime, boolean isFinished) {
@@ -255,76 +208,32 @@ public class ExecutionProfile {
return new Status(TStatusCode.INVALID_ARGUMENT, "QueryId is not
set");
}
- if (isPipelineXProfile) {
- if (!profile.isSetFragmentIdToProfile()) {
- LOG.warn("{} FragmentIdToProfile is not set",
DebugUtil.printId(profile.getQueryId()));
- return new Status(TStatusCode.INVALID_ARGUMENT,
"FragmentIdToProfile is not set");
- }
-
- for (Entry<Integer, List<TDetailedReportParams>> entry :
profile.getFragmentIdToProfile().entrySet()) {
- int fragmentId = entry.getKey();
- List<TDetailedReportParams> fragmentProfile = entry.getValue();
- int pipelineIdx = 0;
- List<RuntimeProfile> taskProfile = Lists.newArrayList();
- for (TDetailedReportParams pipelineProfile : fragmentProfile) {
- String name = "Pipeline :" + pipelineIdx + " "
- + " (host=" + backendHBAddress + ")";
- RuntimeProfile profileNode = new RuntimeProfile(name);
- taskProfile.add(profileNode);
- if (!pipelineProfile.isSetProfile()) {
- LOG.warn("Profile is not set, {}",
DebugUtil.printId(profile.getQueryId()));
- return new Status(TStatusCode.INVALID_ARGUMENT,
"Profile is not set");
- }
-
- profileNode.update(pipelineProfile.profile);
- profileNode.setIsDone(isDone);
- pipelineIdx++;
- fragmentProfiles.get(fragmentId).addChild(profileNode);
- }
- multiBeProfile.get(fragmentId).put(backendHBAddress,
taskProfile);
- }
- } else {
- if (!profile.isSetInstanceProfiles() ||
!profile.isSetFragmentInstanceIds()) {
- LOG.warn("InstanceIdToProfile is not set, {}",
DebugUtil.printId(profile.getQueryId()));
- return new Status(TStatusCode.INVALID_ARGUMENT,
"InstanceIdToProfile is not set");
- }
-
- if (profile.fragment_instance_ids.size() !=
profile.instance_profiles.size()) {
- LOG.warn("InstanceIdToProfile size is not equal, {}",
- DebugUtil.printId(profile.getQueryId()));
- return new Status(TStatusCode.INVALID_ARGUMENT,
"InstanceIdToProfile size is not equal");
- }
+ if (!profile.isSetFragmentIdToProfile()) {
+ LOG.warn("{} FragmentIdToProfile is not set",
DebugUtil.printId(profile.getQueryId()));
+ return new Status(TStatusCode.INVALID_ARGUMENT,
"FragmentIdToProfile is not set");
+ }
- for (int idx = 0; idx < profile.getFragmentInstanceIdsSize();
idx++) {
- TUniqueId instanceId =
profile.getFragmentInstanceIds().get(idx);
- TRuntimeProfileTree instanceProfile =
profile.getInstanceProfiles().get(idx);
- if (instanceProfile == null) {
- LOG.warn("Profile is not set {}",
DebugUtil.printId(profile.getQueryId()));
+ for (Entry<Integer, List<TDetailedReportParams>> entry :
profile.getFragmentIdToProfile().entrySet()) {
+ int fragmentId = entry.getKey();
+ List<TDetailedReportParams> fragmentProfile = entry.getValue();
+ int pipelineIdx = 0;
+ List<RuntimeProfile> taskProfile = Lists.newArrayList();
+ for (TDetailedReportParams pipelineProfile : fragmentProfile) {
+ String name = "Pipeline :" + pipelineIdx + " "
+ + " (host=" + backendHBAddress + ")";
+ RuntimeProfile profileNode = new RuntimeProfile(name);
+ taskProfile.add(profileNode);
+ if (!pipelineProfile.isSetProfile()) {
+ LOG.warn("Profile is not set, {}",
DebugUtil.printId(profile.getQueryId()));
return new Status(TStatusCode.INVALID_ARGUMENT, "Profile
is not set");
}
- PlanFragmentId fragmentId =
instanceIdToFragmentId.get(instanceId);
- if (fragmentId == null) {
- LOG.warn("Could not find related fragment for instance {}",
- DebugUtil.printId(instanceId));
- return new Status(TStatusCode.INVALID_ARGUMENT, "Could not
find related fragment");
- }
-
- // Do not use fragment id in params, because non-pipeline
engine will set it to -1
- Map<TUniqueId, RuntimeProfile> instanceProfiles =
fragmentInstancesProfiles.get(fragmentId);
- if (instanceProfiles == null) {
- LOG.warn("Could not find related instances for fragment
{}", fragmentId);
- return new Status(TStatusCode.INVALID_ARGUMENT, "Could not
find related instance");
- }
-
- RuntimeProfile curInstanceProfile =
instanceProfiles.get(instanceId);
- if (curInstanceProfile == null) {
- LOG.warn("Could not find related profile {}",
DebugUtil.printId(instanceId));
- return new Status(TStatusCode.INVALID_ARGUMENT, "Could not
find related instance");
- }
- curInstanceProfile.setIsDone(isDone);
- curInstanceProfile.update(instanceProfile);
+ profileNode.update(pipelineProfile.profile);
+ profileNode.setIsDone(isDone);
+ pipelineIdx++;
+ fragmentProfiles.get(fragmentId).addChild(profileNode);
}
+ multiBeProfile.get(fragmentId).put(backendHBAddress, taskProfile);
}
if (profile.isSetLoadChannelProfiles()) {
@@ -348,59 +257,32 @@ public class ExecutionProfile {
LOG.warn("backend id is not set in report profile request, bad
message");
return;
}
- if (isPipelineXProfile) {
- int pipelineIdx = 0;
- List<RuntimeProfile> taskProfile = Lists.newArrayList();
- String suffix = " (host=" + backend.getHeartbeatAddress() + ")";
- for (TDetailedReportParams param : params.detailed_report) {
- String name = param.isSetIsFragmentLevel() &&
param.is_fragment_level ? "Fragment Level Profile: "
- + suffix : "Pipeline :" + pipelineIdx + " " + suffix;
- RuntimeProfile profile = new RuntimeProfile(name);
- taskProfile.add(profile);
- if (param.isSetProfile()) {
- profile.update(param.profile);
- }
- if (params.done) {
- profile.setIsDone(true);
- }
- pipelineIdx++;
- profile.sortChildren();
- fragmentProfiles.get(params.fragment_id).addChild(profile);
- }
- // TODO ygl: is this right? there maybe multi Backends, what does
- // update load profile do???
- if (params.isSetLoadChannelProfile()) {
- loadChannelProfile.update(params.loadChannelProfile);
- }
-
multiBeProfile.get(params.fragment_id).put(backend.getHeartbeatAddress(),
taskProfile);
- } else {
- PlanFragmentId fragmentId =
instanceIdToFragmentId.get(params.fragment_instance_id);
- if (fragmentId == null) {
- LOG.warn("Could not find related fragment for instance {}",
- DebugUtil.printId(params.fragment_instance_id));
- return;
- }
- // Do not use fragment id in params, because non-pipeline engine
will set it to -1
- Map<TUniqueId, RuntimeProfile> instanceProfiles =
fragmentInstancesProfiles.get(fragmentId);
- if (instanceProfiles == null) {
- LOG.warn("Could not find related instances for fragment {}",
fragmentId);
- return;
- }
- RuntimeProfile instanceProfile =
instanceProfiles.get(params.fragment_instance_id);
- if (instanceProfile == null) {
- LOG.warn("Could not find related instance {}",
params.fragment_instance_id);
- return;
- }
- if (params.isSetProfile()) {
- instanceProfile.update(params.profile);
- }
- if (params.isSetDone() && params.isDone()) {
- instanceProfile.setIsDone(true);
+
+ int pipelineIdx = 0;
+ List<RuntimeProfile> taskProfile = Lists.newArrayList();
+ String suffix = " (host=" + backend.getHeartbeatAddress() + ")";
+ for (TDetailedReportParams param : params.detailed_report) {
+ String name = param.isSetIsFragmentLevel() &&
param.is_fragment_level ? "Fragment Level Profile: "
+ + suffix : "Pipeline :" + pipelineIdx + " " + suffix;
+ RuntimeProfile profile = new RuntimeProfile(name);
+ taskProfile.add(profile);
+ if (param.isSetProfile()) {
+ profile.update(param.profile);
}
- if (params.isSetLoadChannelProfile()) {
- loadChannelProfile.update(params.loadChannelProfile);
+ if (params.done) {
+ profile.setIsDone(true);
}
+ pipelineIdx++;
+ profile.sortChildren();
+ fragmentProfiles.get(params.fragment_id).addChild(profile);
+ }
+ // TODO ygl: is this right? there maybe multi Backends, what does
+ // update load profile do???
+ if (params.isSetLoadChannelProfile()) {
+ loadChannelProfile.update(params.loadChannelProfile);
}
+
+
multiBeProfile.get(params.fragment_id).put(backend.getHeartbeatAddress(),
taskProfile);
}
// MultiInstances may update the profile concurrently
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
index c0d3614550d..12b3b903880 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
@@ -79,9 +79,6 @@ public class Profile {
LOG.warn("try to set a null excecution profile, it is abnormal",
new Exception());
return;
}
- if (this.isPipelineX) {
- executionProfile.setPipelineX();
- }
executionProfile.setSummaryProfile(summaryProfile);
this.executionProfiles.add(executionProfile);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 265d3afdb35..94e7d59625a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -387,10 +387,6 @@ public class Coordinator implements CoordInterface {
//
https://github.com/apache/doris/blob/bd6f5b6a0e5f1b12744607336123d7f97eb76af9/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java#L155
this.enablePipelineEngine = Config.enable_pipeline_load;
this.enablePipelineXEngine = Config.enable_pipeline_load;
- // make sure Coordinator can update profile correctlly
- if (this.enablePipelineXEngine) {
- this.executionProfile.setPipelineX();
- }
}
private void setFromUserProperty(ConnectContext connectContext) {
@@ -3248,16 +3244,7 @@ public class Coordinator implements CoordInterface {
this.lastMissingHeartbeatTime =
backend.getLastMissingHeartbeatTime();
this.enablePipelineX = enablePipelineX;
- if (this.enablePipelineX) {
- executionProfile.addFragmentBackend(fragmentId, backendId);
- } else {
- for (TPipelineInstanceParams instanceParam :
rpcParams.local_params) {
- String profileName = "Instance " +
DebugUtil.printId(instanceParam.fragment_instance_id)
- + " (host=" + address + ")";
- executionProfile.addInstanceProfile(fragmentId,
instanceParam.fragment_instance_id,
- new RuntimeProfile(profileName));
- }
- }
+ executionProfile.addFragmentBackend(fragmentId, backendId);
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]