This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7d5a6abb9c2 [refactor](profile) refactor profile report on BE (#33331)
7d5a6abb9c2 is described below
commit 7d5a6abb9c2439e29b89bad9b8a35ee1284adf41
Author: zhiqiang <[email protected]>
AuthorDate: Wed Apr 17 13:54:03 2024 +0800
[refactor](profile) refactor profile report on BE (#33331)
First task of #33744
During close stage, PipelineXFragmentContext will collect is profile, and
register them to QueryContext
De-constructor of QueryContext will add all of its profile to
RuntimeQueryStatiticsMgr, and trigger a report thread to report profile
Report thread of RuntimeQueryStatiticsMgr will collect all registered
profile, and report profile by query.
RuntimeQueryStatiticsMgr will create 5 threads to do profile report task,
when BE stops gracefully, they will flush all profile, which is important in
cloud mode.
---
be/src/common/config.cpp | 2 +
be/src/common/config.h | 1 +
be/src/pipeline/pipeline_fragment_context.h | 2 +-
.../pipeline_x/pipeline_x_fragment_context.cpp | 90 ++++-
.../pipeline_x/pipeline_x_fragment_context.h | 3 +
be/src/runtime/exec_env_init.cpp | 5 +-
be/src/runtime/fragment_mgr.cpp | 59 +++
be/src/runtime/fragment_mgr.h | 5 +
be/src/runtime/plan_fragment_executor.cpp | 33 ++
be/src/runtime/plan_fragment_executor.h | 4 +
be/src/runtime/query_context.cpp | 181 ++++++++-
be/src/runtime/query_context.h | 58 +++
be/src/runtime/runtime_query_statistics_mgr.cpp | 421 +++++++++++++++++++++
be/src/runtime/runtime_query_statistics_mgr.h | 70 ++++
be/src/service/backend_service.cpp | 26 ++
be/src/service/backend_service.h | 3 +
.../doris/common/profile/ExecutionProfile.java | 79 ++++
.../java/org/apache/doris/qe/QeProcessorImpl.java | 33 ++
.../org/apache/doris/common/GenericPoolTest.java | 8 +
.../apache/doris/utframe/MockedBackendFactory.java | 8 +
gensrc/thrift/BackendService.thrift | 13 +
gensrc/thrift/FrontendService.thrift | 22 +-
22 files changed, 1112 insertions(+), 14 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 13acfd49042..603b1739062 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1153,6 +1153,8 @@ DEFINE_mInt32(report_query_statistics_interval_ms,
"3000");
// 30s
DEFINE_mInt32(query_statistics_reserve_timeout_ms, "30000");
+DEFINE_mInt32(report_exec_status_thread_num, "5");
+
// consider two high usage disk at the same available level if they do not
exceed this diff.
DEFINE_mDouble(high_disk_avail_level_diff_usages, "0.15");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 9f226e22be2..2aca6a31fbb 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1235,6 +1235,7 @@ DECLARE_Int32(ignore_invalid_partition_id_rowset_num);
DECLARE_mInt32(report_query_statistics_interval_ms);
DECLARE_mInt32(query_statistics_reserve_timeout_ms);
+DECLARE_mInt32(report_exec_status_thread_num);
// consider two high usage disk at the same available level if they do not
exceed this diff.
DECLARE_mDouble(high_disk_avail_level_diff_usages);
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 009a2a2f22d..b9bfcb28f68 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -156,7 +156,7 @@ protected:
ExecEnv* _exec_env = nullptr;
- bool _prepared = false;
+ std::atomic_bool _prepared = false;
bool _submitted = false;
std::mutex _status_lock;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index f23e39472ab..3b03fbbf400 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -21,6 +21,7 @@
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Planner_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
#include <pthread.h>
#include <runtime/result_buffer_mgr.h>
@@ -1378,6 +1379,12 @@ void
PipelineXFragmentContext::_close_fragment_instance() {
LOG_INFO("Query {} fragment {} profile:\n {}",
print_id(this->_query_id),
this->_fragment_id, ss.str());
}
+
+ if (_query_ctx->enable_profile()) {
+ _query_ctx->add_fragment_profile_x(_fragment_id,
collect_realtime_profile_x(),
+
collect_realtime_load_channel_profile_x());
+ }
+
// all submitted tasks done
_exec_env->fragment_mgr()->remove_pipeline_context(
std::dynamic_pointer_cast<PipelineXFragmentContext>(shared_from_this()));
@@ -1409,15 +1416,82 @@ Status PipelineXFragmentContext::send_report(bool done)
{
for (auto& task_state : _task_runtime_states) {
runtime_states.push_back(task_state.get());
}
+
+ ReportStatusRequest req {true,
+ exec_status,
+ runtime_states,
+ nullptr,
+ _runtime_state->load_channel_profile(),
+ done || !exec_status.ok(),
+ _query_ctx->coord_addr,
+ _query_id,
+ _fragment_id,
+ TUniqueId(),
+ _backend_num,
+ _runtime_state.get(),
+ [this](Status st) { return update_status(st); },
+ [this](const PPlanFragmentCancelReason& reason,
+ const std::string& msg) { cancel(reason,
msg); }};
+
return _report_status_cb(
- {true, exec_status, runtime_states, nullptr,
_runtime_state->load_channel_profile(),
- done || !exec_status.ok(), _query_ctx->coord_addr, _query_id,
_fragment_id,
- TUniqueId(), _backend_num, _runtime_state.get(),
- [this](Status st) { return update_status(st); },
- [this](const PPlanFragmentCancelReason& reason, const
std::string& msg) {
- cancel(reason, msg);
- }},
-
std::dynamic_pointer_cast<PipelineXFragmentContext>(shared_from_this()));
+ req,
std::dynamic_pointer_cast<PipelineXFragmentContext>(shared_from_this()));
+}
+
+std::vector<std::shared_ptr<TRuntimeProfileTree>>
+PipelineXFragmentContext::collect_realtime_profile_x() const {
+ std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
+ DCHECK(_query_ctx->enable_pipeline_x_exec() == true)
+ << fmt::format("Query {} calling a pipeline X function, but its
pipeline X is disabled",
+ print_id(this->_query_id));
+
+ // we do not have mutex to protect pipeline_id_to_profile
+ // so we need to make sure this funciton is invoked after fragment context
+ // has already been prepared.
+ if (!this->_prepared) {
+ std::string msg =
+ "Query " + print_id(this->_query_id) + " collecting profile,
but its not prepared";
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ return res;
+ }
+
+ // pipeline_id_to_profile is initialized in prepare stage
+ for (auto& pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
+ auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
+ pipeline_profile->to_thrift(profile_ptr.get());
+ res.push_back(profile_ptr);
+ }
+
+ return res;
+}
+
+std::shared_ptr<TRuntimeProfileTree>
+PipelineXFragmentContext::collect_realtime_load_channel_profile_x() const {
+ // we do not have mutex to protect pipeline_id_to_profile
+ // so we need to make sure this funciton is invoked after fragment context
+ // has already been prepared.
+ if (!this->_prepared) {
+ std::string msg =
+ "Query " + print_id(this->_query_id) + " collecting profile,
but its not prepared";
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ return nullptr;
+ }
+
+ for (auto& runtime_state : _task_runtime_states) {
+ if (runtime_state->runtime_profile() == nullptr) {
+ continue;
+ }
+
+ auto tmp_load_channel_profile =
std::make_shared<TRuntimeProfileTree>();
+
+
runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get());
+
this->_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
+ }
+
+ auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
+
this->_runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get());
+ return load_channel_profile;
}
std::string PipelineXFragmentContext::debug_string() {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 34d00c07652..31febc0d8aa 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -112,6 +112,9 @@ public:
[[nodiscard]] int max_sink_operator_id() const { return _sink_operator_id;
}
+ std::vector<std::shared_ptr<TRuntimeProfileTree>>
collect_realtime_profile_x() const;
+ std::shared_ptr<TRuntimeProfileTree>
collect_realtime_load_channel_profile_x() const;
+
std::string debug_string() override;
private:
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 76c877c3695..37cd51d1ca4 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -308,7 +308,7 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
_workload_sched_mgr->start(this);
RETURN_IF_ERROR(_spill_stream_mgr->init());
-
+ _runtime_query_statistics_mgr->start_report_thread();
_s_ready = true;
return Status::OK();
@@ -606,6 +606,9 @@ void ExecEnv::destroy() {
_storage_engine.reset();
SAFE_STOP(_spill_stream_mgr);
+ if (_runtime_query_statistics_mgr) {
+ _runtime_query_statistics_mgr->stop_report_thread();
+ }
SAFE_SHUTDOWN(_buffered_reader_prefetch_thread_pool);
SAFE_SHUTDOWN(_s3_file_upload_thread_pool);
SAFE_SHUTDOWN(_join_node_thread_pool);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 14f77a98ace..bc80dca583d 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -29,6 +29,7 @@
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Planner_types.h>
#include <gen_cpp/QueryPlanExtra_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <pthread.h>
@@ -36,6 +37,7 @@
#include <thrift/Thrift.h>
#include <thrift/protocol/TDebugProtocol.h>
#include <thrift/transport/TTransportException.h>
+#include <unistd.h>
#include <atomic>
@@ -47,6 +49,7 @@
#include <memory>
#include <mutex>
#include <sstream>
+#include <unordered_map>
#include <utility>
#include "common/config.h"
@@ -483,6 +486,7 @@ void
FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex
std::chrono::system_clock::now().time_since_epoch())
.count();
std::lock_guard<std::mutex> lock(_lock);
+
_fragment_instance_map.erase(fragment_executor->fragment_instance_id());
g_fragment_executing_count << -1;
@@ -1612,4 +1616,59 @@ void
FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* query_i
}
}
+Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id,
+ TReportExecStatusParams*
exec_status) {
+ if (exec_status == nullptr) {
+ return Status::InvalidArgument("exes_status is nullptr");
+ }
+
+ std::shared_ptr<QueryContext> query_context = nullptr;
+
+ {
+ std::lock_guard<std::mutex> lock(_lock);
+ query_context = _query_ctx_map[query_id];
+ }
+
+ if (query_context == nullptr) {
+ return Status::NotFound("Query {} not found", print_id(query_id));
+ }
+
+ if (query_context->enable_pipeline_x_exec()) {
+ *exec_status = query_context->get_realtime_exec_status_x();
+ } else {
+ auto instance_ids = query_context->get_fragment_instance_ids();
+ std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>
instance_profiles;
+ std::vector<std::shared_ptr<TRuntimeProfileTree>>
load_channel_profiles;
+
+ for (auto& instance_id : instance_ids) {
+ std::shared_ptr<PlanFragmentExecutor> instance_executor = nullptr;
+
+ {
+ std::lock_guard<std::mutex> lock(_lock);
+ instance_executor = _fragment_instance_map[instance_id];
+ }
+
+ if (instance_executor == nullptr) {
+ return Status::NotFound("Fragment instance {} not found",
print_id(instance_id));
+ }
+
+ if (auto instance_profile =
instance_executor->collect_realtime_query_profile()) {
+ instance_profiles.insert(std::make_pair(instance_id,
instance_profile));
+ } else {
+ continue;
+ }
+
+ if (auto load_channel_profile =
+
instance_executor->collect_realtime_load_channel_profile()) {
+ load_channel_profiles.push_back(load_channel_profile);
+ }
+ }
+
+ *exec_status =
RuntimeQueryStatiticsMgr::create_report_exec_status_params_non_pipeline(
+ query_id, instance_profiles, load_channel_profiles);
+ }
+
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index e748c88cece..25b555f4fe8 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -17,6 +17,7 @@
#pragma once
+#include <gen_cpp/FrontendService_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/types.pb.h>
#include <stdint.h>
@@ -33,6 +34,7 @@
#include "common/status.h"
#include "gutil/ref_counted.h"
#include "http/rest_monitor_iface.h"
+#include "runtime/plan_fragment_executor.h"
#include "runtime/query_context.h"
#include "runtime_filter_mgr.h"
#include "util/countdown_latch.h"
@@ -151,6 +153,9 @@ public:
void get_runtime_query_info(std::vector<WorkloadQueryInfo>*
_query_info_list);
+ Status get_realtime_exec_status(const TUniqueId& query_id,
+ TReportExecStatusParams* exec_status);
+
private:
void cancel_unlocked_impl(const TUniqueId& id, const
PPlanFragmentCancelReason& reason,
const std::unique_lock<std::mutex>& state_lock,
bool is_pipeline,
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index 7bf4f3846dd..5c0668ef55c 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -25,11 +25,13 @@
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Planner_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
#include <pthread.h>
#include <stdint.h>
#include <stdlib.h>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
+#include <memory>
#include <ostream>
#include <typeinfo>
#include <utility>
@@ -638,10 +640,41 @@ void PlanFragmentExecutor::close() {
load_channel_profile()->pretty_print(&ss);
}
LOG(INFO) << ss.str();
+
+ _query_ctx->add_instance_profile(_fragment_instance_id,
+ collect_realtime_query_profile(),
+
collect_realtime_load_channel_profile());
}
}
_closed = true;
}
+std::shared_ptr<TRuntimeProfileTree>
PlanFragmentExecutor::collect_realtime_query_profile() const {
+ std::shared_ptr<TRuntimeProfileTree> res =
std::make_shared<TRuntimeProfileTree>();
+
+ if (_runtime_state != nullptr) {
+ _runtime_state->runtime_profile()->compute_time_in_profile();
+ _runtime_state->runtime_profile()->to_thrift(res.get());
+ } else {
+ return nullptr;
+ }
+
+ return res;
+}
+
+std::shared_ptr<TRuntimeProfileTree>
PlanFragmentExecutor::collect_realtime_load_channel_profile()
+ const {
+ std::shared_ptr<TRuntimeProfileTree> res =
std::make_shared<TRuntimeProfileTree>();
+
+ if (_runtime_state != nullptr) {
+ _runtime_state->load_channel_profile()->compute_time_in_profile();
+ _runtime_state->load_channel_profile()->to_thrift(res.get());
+ } else {
+ return nullptr;
+ }
+
+ return res;
+}
+
} // namespace doris
diff --git a/be/src/runtime/plan_fragment_executor.h
b/be/src/runtime/plan_fragment_executor.h
index 89b2534b61b..e4d29af9ae9 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -21,6 +21,7 @@
#pragma once
#include <gen_cpp/PaloInternalService_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/types.pb.h>
@@ -146,6 +147,9 @@ public:
Status update_status(Status status);
+ std::shared_ptr<TRuntimeProfileTree> collect_realtime_query_profile()
const;
+ std::shared_ptr<TRuntimeProfileTree>
collect_realtime_load_channel_profile() const;
+
private:
ExecEnv* _exec_env = nullptr; // not owned
ExecNode* _plan = nullptr; // lives in _runtime_state->obj_pool()
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 10f54255741..0a76a13cac9 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -17,15 +17,31 @@
#include "runtime/query_context.h"
+#include <fmt/core.h>
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
+#include <gen_cpp/Types_types.h>
+#include <glog/logging.h>
+
#include <exception>
#include <memory>
+#include <mutex>
+#include <sstream>
+#include <utility>
+#include "common/logging.h"
+#include "olap/olap_common.h"
#include "pipeline/pipeline_fragment_context.h"
#include "pipeline/pipeline_x/dependency.h"
+#include "pipeline/pipeline_x/pipeline_x_fragment_context.h"
+#include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
#include "runtime/runtime_query_statistics_mgr.h"
+#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/mem_info.h"
+#include "util/uid_util.h"
namespace doris {
@@ -117,7 +133,11 @@ QueryContext::~QueryContext() {
}
_exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id));
- LOG_INFO("Query {} deconstructed, {}", print_id(_query_id),
mem_tracker_msg);
+
+ if (enable_profile()) {
+ _report_query_profile();
+ }
+
// Not release the the thread token in query context's dector method,
because the query
// conext may be dectored in the thread token it self. It is very
dangerous and may core.
// And also thread token need shutdown, it may take some time, may cause
the thread that
@@ -146,6 +166,8 @@ QueryContext::~QueryContext() {
_runtime_predicates.clear();
file_scan_range_params_map.clear();
obj_pool.clear();
+
+ LOG_INFO("Query {} deconstructed, {}", print_id(this->_query_id),
mem_tracker_msg);
}
void QueryContext::set_ready_to_execute(bool is_cancelled) {
@@ -303,4 +325,161 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr&
tg) {
return Status::OK();
}
+void QueryContext::add_fragment_profile_x(
+ int fragment_id, const
std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profiles,
+ std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
+ if (pipeline_profiles.empty()) {
+ std::string msg = fmt::format("Add pipeline profile failed, query {},
fragment {}",
+ print_id(this->_query_id), fragment_id);
+ LOG_ERROR(msg);
+ DCHECK(false) << msg;
+ return;
+ }
+
+#ifndef NDEBUG
+ for (const auto& p : pipeline_profiles) {
+ DCHECK(p != nullptr) << fmt::format("Add pipeline profile failed,
query {}, fragment {}",
+ print_id(this->_query_id),
fragment_id);
+ }
+#endif
+
+ std::lock_guard<std::mutex> l(_profile_mutex);
+ LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline
profile count {} ",
+ print_id(this->_query_id), fragment_id, pipeline_profiles.size());
+
+ _profile_map_x.insert(std::make_pair(fragment_id, pipeline_profiles));
+
+ if (load_channel_profile != nullptr) {
+ _load_channel_profile_map_x.insert(std::make_pair(fragment_id,
load_channel_profile));
+ }
+}
+
+void QueryContext::add_instance_profile(const TUniqueId& instance_id,
+ std::shared_ptr<TRuntimeProfileTree>
profile,
+ std::shared_ptr<TRuntimeProfileTree>
load_channel_profile) {
+ DCHECK(profile != nullptr) << print_id(instance_id);
+
+ std::lock_guard<std::mutex> lg(_profile_mutex);
+ _profile_map.insert(std::make_pair(instance_id, profile));
+ if (load_channel_profile != nullptr) {
+ _load_channel_profile_map.insert(std::make_pair(instance_id,
load_channel_profile));
+ }
+}
+
+void QueryContext::_report_query_profile() {
+ _report_query_profile_x();
+ _report_query_profile_non_pipeline();
+}
+
+void QueryContext::_report_query_profile_non_pipeline() {
+ if (enable_pipeline_exec() || enable_pipeline_x_exec()) {
+ return;
+ }
+
+ std::lock_guard<std::mutex> lg(_profile_mutex);
+ LOG_INFO("Query {}, register query profile, instance profile count {}",
print_id(_query_id),
+ _profile_map.size());
+
+ for (auto& [instance_id, instance_profile] : _profile_map) {
+ std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr;
+ if (_load_channel_profile_map.contains(instance_id)) {
+ load_channel_profile = _load_channel_profile_map[instance_id];
+ }
+
+
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_instance_profile(
+ _query_id, this->coord_addr, instance_id, instance_profile,
load_channel_profile);
+ }
+
+
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_report_profile();
+}
+
+void QueryContext::_report_query_profile_x() {
+ if (!enable_pipeline_x_exec()) {
+ return;
+ }
+
+ std::lock_guard<std::mutex> lg(_profile_mutex);
+ LOG_INFO(
+ "Pipeline x query context, register query profile, query {},
fragment profile count {}",
+ print_id(_query_id), _profile_map_x.size());
+
+ for (auto& [fragment_id, fragment_profile] : _profile_map_x) {
+ std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr;
+
+ if (_load_channel_profile_map_x.contains(fragment_id)) {
+ load_channel_profile = _load_channel_profile_map_x[fragment_id];
+ }
+
+
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_fragment_profile_x(
+ _query_id, this->coord_addr, fragment_id, fragment_profile,
load_channel_profile);
+ }
+
+
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_report_profile();
+}
+
+std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
+QueryContext::_collect_realtime_query_profile_x() const {
+ std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
res;
+
+ if (!enable_pipeline_x_exec()) {
+ return res;
+ }
+
+ for (auto& [fragment_id, fragment_ctx_wptr] :
_fragment_id_to_pipeline_ctx) {
+ if (auto fragment_ctx = fragment_ctx_wptr.lock()) {
+ // In theory, cast result can not be nullptr since we have checked
the pipeline X engine above
+ std::shared_ptr<pipeline::PipelineXFragmentContext> fragment_ctx_x
=
+
std::dynamic_pointer_cast<pipeline::PipelineXFragmentContext>(fragment_ctx);
+
+ if (fragment_ctx_x == nullptr) {
+ std::string msg =
+ fmt::format("PipelineXFragmentContext is nullptr,
query {} fragment_id: {}",
+ print_id(_query_id), fragment_id);
+ LOG_ERROR(msg);
+ DCHECK(false) << msg;
+ continue;
+ }
+
+ auto profile = fragment_ctx_x->collect_realtime_profile_x();
+
+ if (profile.empty()) {
+ std::string err_msg = fmt::format(
+ "Get nothing when collecting profile, query {},
fragment_id: {}",
+ print_id(_query_id), fragment_id);
+ LOG_ERROR(err_msg);
+ DCHECK(false) << err_msg;
+ continue;
+ }
+
+ res.insert(std::make_pair(fragment_id, profile));
+ }
+ }
+
+ return res;
+}
+
+TReportExecStatusParams QueryContext::get_realtime_exec_status_x() const {
+ TReportExecStatusParams exec_status;
+
+ if (enable_pipeline_x_exec()) {
+ auto realtime_query_profile = _collect_realtime_query_profile_x();
+ std::vector<std::shared_ptr<TRuntimeProfileTree>>
load_channel_profiles;
+
+ for (auto load_channel_profile : _load_channel_profile_map_x) {
+ if (load_channel_profile.second != nullptr) {
+ load_channel_profiles.push_back(load_channel_profile.second);
+ }
+ }
+
+ exec_status =
RuntimeQueryStatiticsMgr::create_report_exec_status_params_x(
+ this->_query_id, realtime_query_profile,
load_channel_profiles);
+ } else {
+ auto msg = fmt::format("Query {} is not pipelineX query",
print_id(_query_id));
+ LOG_ERROR(msg);
+ DCHECK(false) << msg;
+ }
+
+ return exec_status;
+}
+
} // namespace doris
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index c78886997d0..b9a7e780375 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -18,11 +18,14 @@
#pragma once
#include <gen_cpp/PaloInternalService_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
#include <gen_cpp/Types_types.h>
#include <atomic>
#include <memory>
+#include <mutex>
#include <string>
+#include <unordered_map>
#include "common/config.h"
#include "common/factory_creator.h"
@@ -32,6 +35,7 @@
#include "runtime/query_statistics.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_predicate.h"
+#include "util/hash_util.hpp"
#include "util/threadpool.h"
#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/runtime/shared_hash_table_controller.h"
@@ -238,6 +242,8 @@ public:
ThreadPool* get_non_pipe_exec_thread_pool();
+ std::vector<TUniqueId> get_fragment_instance_ids() const { return
fragment_instance_ids; }
+
int64_t mem_limit() const { return _bytes_limit; }
void set_merge_controller_handler(
@@ -351,6 +357,58 @@ private:
std::mutex _weighted_mem_lock;
int64_t _weighted_consumption = 0;
int64_t _weighted_limit = 0;
+
+ std::mutex _profile_mutex;
+
+ // when fragment of pipeline x is closed, it will register its profile to
this map by using add_fragment_profile_x
+ // flatten profile of one fragment:
+ // Pipeline 0
+ // PipelineTask 0
+ // Operator 1
+ // Operator 2
+ // Scanner
+ // PipelineTask 1
+ // Operator 1
+ // Operator 2
+ // Scanner
+ // Pipeline 1
+ // PipelineTask 2
+ // Operator 3
+ // PipelineTask 3
+ // Operator 3
+ // fragment_id -> list<profile>
+ std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
_profile_map_x;
+ std::unordered_map<int, std::shared_ptr<TRuntimeProfileTree>>
_load_channel_profile_map_x;
+
+ // instance_id -> profile
+ std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>
_profile_map;
+ std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>
_load_channel_profile_map;
+
+ void _report_query_profile();
+ void _report_query_profile_non_pipeline();
+ void _report_query_profile_x();
+
+ std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
+ _collect_realtime_query_profile_x() const;
+
+ std::unordered_map<TUniqueId,
std::vector<std::shared_ptr<TRuntimeProfileTree>>>
+ _collect_realtime_query_profile_non_pipeline() const;
+
+public:
+ // when fragment of pipeline x is closed, it will register its profile to
this map by using add_fragment_profile_x
+ void add_fragment_profile_x(
+ int fragment_id,
+ const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
pipeline_profile,
+ std::shared_ptr<TRuntimeProfileTree> load_channel_profile);
+
+ void add_instance_profile(const TUniqueId& iid,
std::shared_ptr<TRuntimeProfileTree> profile,
+ std::shared_ptr<TRuntimeProfileTree>
load_channel_profile);
+
+ TReportExecStatusParams get_realtime_exec_status_x() const;
+
+ bool enable_profile() const {
+ return _query_options.__isset.enable_profile &&
_query_options.enable_profile;
+ }
};
} // namespace doris
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 9764b0f0507..a59938b7d51 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -17,13 +17,434 @@
#include "runtime/runtime_query_statistics_mgr.h"
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
+#include <gen_cpp/Status_types.h>
+#include <gen_cpp/Types_types.h>
+#include <thrift/TApplicationException.h>
+
+#include <condition_variable>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <random>
+#include <shared_mutex>
+#include <string>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+
+#include "common/logging.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
+#include "runtime/query_context.h"
+#include "service/backend_options.h"
#include "util/debug_util.h"
+#include "util/hash_util.hpp"
#include "util/time.h"
+#include "util/uid_util.h"
#include "vec/core/block.h"
namespace doris {
+// TODO: Currently this function is only used to report profile.
+// In the future, all exec status and query statistics should be reported
+// thorough this function.
+static Status _do_report_exec_stats_rpc(const TNetworkAddress& coor_addr,
+ const TReportExecStatusParams& req,
+ TReportExecStatusResult& res) {
+ Status client_status;
+ FrontendServiceConnection
rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), coor_addr,
+ &client_status);
+ if (!client_status.ok()) {
+ LOG_WARNING(
+ "Could not get client rpc client of {} when reporting
profiles, reason is {}, "
+ "not reporting, profile will be lost",
+ PrintThriftNetworkAddress(coor_addr),
client_status.to_string());
+ return Status::RpcError("Client rpc client failed");
+ }
+
+ try {
+ try {
+ rpc_client->reportExecStatus(res, req);
+ } catch (const apache::thrift::transport::TTransportException& e) {
+ LOG_WARNING("Transport exception from {}, reason: {}, reopening",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
+ if (!client_status.ok()) {
+ LOG_WARNING("Reopen failed, reason: {}",
client_status.to_string());
+ return Status::RpcError("Open rpc client failed");
+ }
+
+ rpc_client->reportExecStatus(res, req);
+ }
+ } catch (apache::thrift::TApplicationException& e) {
+ if (e.getType() == e.UNKNOWN_METHOD) {
+ LOG_WARNING(
+ "Failed to report query profile to {} due to {}, usually
because the frontend "
+ "is not upgraded, check the version",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ } else {
+ LOG_WARNING(
+ "Failed to report query profile to {}, reason: {}, you can
see fe log for "
+ "details.",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ }
+ return Status::RpcError("Send stats failed");
+ } catch (std::exception& e) {
+ LOG_WARNING(
+ "Failed to report query profile to {}, reason: {}, you can see
fe log for details.",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ return Status::RpcError("Send report query profile failed");
+ }
+
+ return Status::OK();
+}
+
+TReportExecStatusParams
RuntimeQueryStatiticsMgr::create_report_exec_status_params_x(
+ const TUniqueId& query_id,
+ const std::unordered_map<int32,
std::vector<std::shared_ptr<TRuntimeProfileTree>>>&
+ fragment_id_to_profile,
+ const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
load_channel_profiles) {
+ TQueryProfile profile;
+ profile.__set_query_id(query_id);
+
+ std::map<int32_t, std::vector<TDetailedReportParams>>
fragment_id_to_profile_req;
+
+ for (const auto& entry : fragment_id_to_profile) {
+ int32_t fragment_id = entry.first;
+ const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
fragment_profile = entry.second;
+ std::vector<TDetailedReportParams> detailed_params;
+
+ for (auto pipeline_profile : fragment_profile) {
+ if (pipeline_profile == nullptr) {
+ auto msg = fmt::format("Register fragment profile {} {}
failed, profile is null",
+ print_id(query_id), fragment_id);
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+
+ TDetailedReportParams tmp;
+ tmp.__set_profile(*pipeline_profile);
+ // tmp.fragment_instance_id is not needed for pipeline x
+ detailed_params.push_back(tmp);
+ }
+
+ fragment_id_to_profile_req.insert(std::make_pair(fragment_id,
detailed_params));
+ }
+
+ if (fragment_id_to_profile_req.size() == 0) {
+ LOG_WARNING("No fragment profile found for query {}",
print_id(query_id));
+ }
+
+ profile.__set_fragment_id_to_profile(fragment_id_to_profile_req);
+
+ std::vector<TRuntimeProfileTree> load_channel_profiles_req;
+ for (auto load_channel_profile : load_channel_profiles) {
+ if (load_channel_profile == nullptr) {
+ auto msg = fmt::format(
+ "Register fragment profile {} {} failed, load channel
profile is null",
+ print_id(query_id), -1);
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+
+ load_channel_profiles_req.push_back(*load_channel_profile);
+ }
+
+ if (load_channel_profiles_req.size() > 0) {
+ profile.__set_load_channel_profiles(load_channel_profiles_req);
+ }
+
+ TReportExecStatusParams req;
+ req.__set_query_profile(profile);
+ req.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id);
+ // invalid query id to avoid API compatibility during upgrade
+ req.__set_query_id(TUniqueId());
+
+ return req;
+}
+
+TReportExecStatusParams
RuntimeQueryStatiticsMgr::create_report_exec_status_params_non_pipeline(
+ const TUniqueId& query_id,
+ const std::unordered_map<TUniqueId,
std::shared_ptr<TRuntimeProfileTree>>&
+ instance_id_to_profile,
+ const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
load_channel_profile) {
+ TQueryProfile profile;
+ std::vector<TUniqueId> fragment_instance_ids;
+ std::vector<TRuntimeProfileTree> instance_profiles;
+ std::vector<TRuntimeProfileTree> load_channel_profiles;
+
+ for (auto entry : instance_id_to_profile) {
+ TUniqueId instance_id = entry.first;
+ std::shared_ptr<TRuntimeProfileTree> profile = entry.second;
+
+ if (profile == nullptr) {
+ auto msg = fmt::format("Register instance profile {} {} failed,
profile is null",
+ print_id(query_id), print_id(instance_id));
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+
+ fragment_instance_ids.push_back(instance_id);
+ instance_profiles.push_back(*profile);
+ }
+
+ profile.__set_query_id(query_id);
+ profile.__set_fragment_instance_ids(fragment_instance_ids);
+ profile.__set_instance_profiles(instance_profiles);
+ profile.__set_load_channel_profiles(load_channel_profiles);
+
+ TReportExecStatusParams res;
+ res.__set_query_profile(profile);
+ // invalid query id to avoid API compatibility during upgrade
+ res.__set_query_id(TUniqueId());
+ res.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id);
+ return res;
+}
+
+void RuntimeQueryStatiticsMgr::start_report_thread() {
+ if (started.load()) {
+ DCHECK(false) << "report thread has been started";
+ LOG_ERROR("report thread has been started");
+ return;
+ }
+
+ started.store(true);
+
+ for (size_t i = 0; i < config::report_exec_status_thread_num; ++i) {
+
this->_report_profile_threads.emplace_back(std::make_unique<std::thread>(
+ &RuntimeQueryStatiticsMgr::report_query_profiles_thread,
this));
+ }
+}
+
+void RuntimeQueryStatiticsMgr::report_query_profiles_thread() {
+ while (true) {
+ {
+ std::unique_lock<std::mutex> lock(_report_profile_mutex);
+
+ while (_query_profile_map.empty() && _profile_map_x.empty() &&
+ !_report_profile_thread_stop) {
+ _report_profile_cv.wait_for(lock, std::chrono::seconds(3));
+ }
+ }
+
+ _report_query_profiles_function();
+
+ {
+ std::lock_guard<std::mutex> lg(_report_profile_mutex);
+
+ if (_report_profile_thread_stop) {
+ LOG_INFO("Report profile thread stopped");
+ return;
+ }
+ }
+ }
+}
+
+void RuntimeQueryStatiticsMgr::trigger_report_profile() {
+ std::unique_lock<std::mutex> lock(_report_profile_mutex);
+ _report_profile_cv.notify_one();
+}
+
+void RuntimeQueryStatiticsMgr::stop_report_thread() {
+ if (!started) {
+ return;
+ }
+
+ {
+ std::unique_lock<std::mutex> lock(_report_profile_mutex);
+ _report_profile_thread_stop = true;
+ LOG_INFO("All report threads are going to stop");
+ _report_profile_cv.notify_all();
+ }
+
+ for (const auto& thread : _report_profile_threads) {
+ thread->join();
+ }
+
+ LOG_INFO("All report threads stopped");
+}
+
+void RuntimeQueryStatiticsMgr::register_instance_profile(
+ const TUniqueId& query_id, const TNetworkAddress& coor_addr, const
TUniqueId& instance_id,
+ std::shared_ptr<TRuntimeProfileTree> instance_profile,
+ std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
+ if (instance_profile == nullptr) {
+ auto msg = fmt::format("Register instance profile {} {} failed,
profile is null",
+ print_id(query_id), print_id(instance_id));
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ return;
+ }
+
+ std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
+
+ if (!_query_profile_map.contains(query_id)) {
+ _query_profile_map[query_id] = std::make_tuple(
+ coor_addr, std::unordered_map<TUniqueId,
std::shared_ptr<TRuntimeProfileTree>>());
+ }
+
+ std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>&
instance_profile_map =
+ std::get<1>(_query_profile_map[query_id]);
+ instance_profile_map.insert(std::make_pair(instance_id, instance_profile));
+
+ if (load_channel_profile != nullptr) {
+ _load_channel_profile_map[instance_id] = load_channel_profile;
+ }
+
+ LOG_INFO("Register instance profile {} {}", print_id(query_id),
print_id(instance_id));
+}
+
+void RuntimeQueryStatiticsMgr::register_fragment_profile_x(
+ const TUniqueId& query_id, const TNetworkAddress& coor_addr, int32_t
fragment_id,
+ std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles,
+ std::shared_ptr<TRuntimeProfileTree> load_channel_profile_x) {
+ for (const auto& p : p_profiles) {
+ if (p == nullptr) {
+ auto msg = fmt::format("Register fragment profile {} {} failed,
profile is null",
+ print_id(query_id), fragment_id);
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ return;
+ }
+ }
+
+ std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
+
+ if (!_profile_map_x.contains(query_id)) {
+ _profile_map_x[query_id] = std::make_tuple(
+ coor_addr,
+ std::unordered_map<int,
std::vector<std::shared_ptr<TRuntimeProfileTree>>>());
+ }
+
+ std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>&
+ fragment_profile_map = std::get<1>(_profile_map_x[query_id]);
+ fragment_profile_map.insert(std::make_pair(fragment_id, p_profiles));
+
+ if (load_channel_profile_x != nullptr) {
+ _load_channel_profile_map_x[std::make_pair(query_id, fragment_id)] =
load_channel_profile_x;
+ }
+
+ LOG_INFO("register x profile done {}, fragment {}, profiles {}",
print_id(query_id),
+ fragment_id, p_profiles.size());
+}
+
+void RuntimeQueryStatiticsMgr::_report_query_profiles_non_pipeline() {
+ // query_id -> {coordinator_addr, {instance_id -> instance_profile}}
+ decltype(_query_profile_map) profile_copy;
+ decltype(_load_channel_profile_map) load_channel_profile_copy;
+
+ {
+ std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
+ _query_profile_map.swap(profile_copy);
+ _load_channel_profile_map.swap(load_channel_profile_copy);
+ }
+
+ // query_id -> {coordinator_addr, {instance_id -> instance_profile}}
+ for (const auto& entry : profile_copy) {
+ const auto& query_id = entry.first;
+ const auto& coor_addr = std::get<0>(entry.second);
+ const std::unordered_map<TUniqueId,
std::shared_ptr<TRuntimeProfileTree>>&
+ instance_id_to_profile = std::get<1>(entry.second);
+
+ for (const auto& profile_entry : instance_id_to_profile) {
+ const auto& instance_id = profile_entry.first;
+ const auto& instance_profile = profile_entry.second;
+
+ if (instance_profile == nullptr) {
+ auto msg = fmt::format("Query {} instance {} profile is null",
print_id(query_id),
+ print_id(instance_id));
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+ }
+
+ std::vector<std::shared_ptr<TRuntimeProfileTree>>
load_channel_profiles;
+ for (const auto& load_channel_profile : load_channel_profile_copy) {
+ if (load_channel_profile.second == nullptr) {
+ auto msg = fmt::format(
+ "Register fragment profile {} {} failed, load channel
profile is null",
+ print_id(query_id), -1);
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+
+ load_channel_profiles.push_back(load_channel_profile.second);
+ }
+
+ TReportExecStatusParams req =
create_report_exec_status_params_non_pipeline(
+ query_id, instance_id_to_profile, load_channel_profiles);
+ TReportExecStatusResult res;
+ auto rpc_status = _do_report_exec_stats_rpc(coor_addr, req, res);
+
+ if (res.status.status_code != TStatusCode::OK) {
+ std::stringstream ss;
+ res.status.printTo(ss);
+ LOG_WARNING("Query {} send profile to {} failed, msg: {}",
print_id(query_id),
+ PrintThriftNetworkAddress(coor_addr), ss.str());
+ } else {
+ LOG_INFO("Send {} profile finished", print_id(query_id));
+ }
+ }
+}
+
+void RuntimeQueryStatiticsMgr::_report_query_profiles_x() {
+ decltype(_profile_map_x) profile_copy;
+ decltype(_load_channel_profile_map_x) load_channel_profile_copy;
+
+ {
+ std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
+ _profile_map_x.swap(profile_copy);
+ _load_channel_profile_map_x.swap(load_channel_profile_copy);
+ }
+
+ // query_id -> {coordinator_addr, {fragment_id ->
std::vectpr<pipeline_profile>}}
+ for (const auto& entry : profile_copy) {
+ const auto& query_id = entry.first;
+ const auto& coor_addr = std::get<0>(entry.second);
+ const auto& fragment_profile_map = std::get<1>(entry.second);
+
+ if (fragment_profile_map.empty()) {
+ auto msg = fmt::format("Query {} does not have profile",
print_id(query_id));
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+
+ std::vector<std::shared_ptr<TRuntimeProfileTree>>
load_channel_profiles;
+ for (auto load_channel_profile : load_channel_profile_copy) {
+ if (load_channel_profile.second == nullptr) {
+ auto msg = fmt::format(
+ "Register fragment profile {} {} failed, load channel
profile is null",
+ print_id(query_id), -1);
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+
+ load_channel_profiles.push_back(load_channel_profile.second);
+ }
+
+ TReportExecStatusParams req = create_report_exec_status_params_x(
+ query_id, fragment_profile_map, load_channel_profiles);
+ TReportExecStatusResult res;
+
+ auto rpc_status = _do_report_exec_stats_rpc(coor_addr, req, res);
+
+ if (res.status.status_code != TStatusCode::OK ||
+ res.status.status_code != TStatusCode::OK) {
+ LOG_WARNING("Query {} send profile to {} failed",
print_id(query_id),
+ PrintThriftNetworkAddress(coor_addr));
+ } else {
+ LOG_INFO("Send {} profile succeed", print_id(query_id));
+ }
+ }
+}
void QueryStatisticsCtx::collect_query_statistics(TQueryStatistics* tq_s) {
QueryStatistics tmp_qs;
diff --git a/be/src/runtime/runtime_query_statistics_mgr.h
b/be/src/runtime/runtime_query_statistics_mgr.h
index 1b3e164d48f..088dd39be55 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.h
+++ b/be/src/runtime/runtime_query_statistics_mgr.h
@@ -18,12 +18,22 @@
#pragma once
#include <gen_cpp/Data_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
+#include <gen_cpp/Types_types.h>
+#include <condition_variable>
+#include <cstdint>
+#include <memory>
+#include <mutex>
#include <shared_mutex>
#include <string>
+#include <thread>
+#include <unordered_map>
+#include "gutil/integral_types.h"
#include "runtime/query_statistics.h"
#include "runtime/workload_management/workload_condition.h"
+#include "util/hash_util.hpp"
#include "util/time.h"
namespace doris {
@@ -57,6 +67,18 @@ public:
RuntimeQueryStatiticsMgr() = default;
~RuntimeQueryStatiticsMgr() = default;
+ static TReportExecStatusParams create_report_exec_status_params_x(
+ const TUniqueId& q_id,
+ const std::unordered_map<int32,
std::vector<std::shared_ptr<TRuntimeProfileTree>>>&
+ fragment_id_to_profile,
+ const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
load_channel_profile);
+
+ static TReportExecStatusParams
create_report_exec_status_params_non_pipeline(
+ const TUniqueId& q_id,
+ const std::unordered_map<TUniqueId,
std::shared_ptr<TRuntimeProfileTree>>&
+ instance_id_to_profile,
+ const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
load_channel_profile);
+
void register_query_statistics(std::string query_id,
std::shared_ptr<QueryStatistics> qs_ptr,
TNetworkAddress fe_addr);
@@ -75,9 +97,57 @@ public:
// used for backend_active_tasks
void get_active_be_tasks_block(vectorized::Block* block);
+ void start_report_thread();
+ void report_query_profiles_thread();
+ void trigger_report_profile();
+ void stop_report_thread();
+
+ void register_instance_profile(const TUniqueId& query_id, const
TNetworkAddress& coor_addr,
+ const TUniqueId& instance_id,
+ std::shared_ptr<TRuntimeProfileTree>
instance_profile,
+ std::shared_ptr<TRuntimeProfileTree>
load_channel_profile);
+
+ void register_fragment_profile_x(const TUniqueId& query_id, const
TNetworkAddress& const_addr,
+ int32_t fragment_id,
+
std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles,
+ std::shared_ptr<TRuntimeProfileTree>
load_channel_profile_x);
+
private:
std::shared_mutex _qs_ctx_map_lock;
std::map<std::string, std::unique_ptr<QueryStatisticsCtx>>
_query_statistics_ctx_map;
+
+ std::mutex _report_profile_mutex;
+ std::atomic_bool started = false;
+ std::vector<std::unique_ptr<std::thread>> _report_profile_threads;
+ std::condition_variable _report_profile_cv;
+ bool _report_profile_thread_stop = false;
+
+ void _report_query_profiles_function() {
+ _report_query_profiles_x();
+ _report_query_profiles_non_pipeline();
+ }
+
+ void _report_query_profiles_x();
+ void _report_query_profiles_non_pipeline();
+
+ std::shared_mutex _query_profile_map_lock;
+
+ // query_id -> {coordinator_addr, {fragment_id ->
std::vectpr<pipeline_profile>}}
+ std::unordered_map<
+ TUniqueId,
+ std::tuple<TNetworkAddress,
+ std::unordered_map<int,
std::vector<std::shared_ptr<TRuntimeProfileTree>>>>>
+ _profile_map_x;
+ std::unordered_map<std::pair<TUniqueId, int32_t>,
std::shared_ptr<TRuntimeProfileTree>>
+ _load_channel_profile_map_x;
+
+ // query_id -> {coordinator_addr, {instance_id -> instance_profile}}
+ std::unordered_map<
+ TUniqueId,
+ std::tuple<TNetworkAddress,
+ std::unordered_map<TUniqueId,
std::shared_ptr<TRuntimeProfileTree>>>>
+ _query_profile_map;
+ std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>
_load_channel_profile_map;
};
} // namespace doris
\ No newline at end of file
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index b1a110144ef..8aab496ed2f 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -23,6 +23,7 @@
#include <gen_cpp/BackendService_types.h>
#include <gen_cpp/Data_types.h>
#include <gen_cpp/DorisExternalService_types.h>
+#include <gen_cpp/FrontendService_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/Planner_types.h>
#include <gen_cpp/Status_types.h>
@@ -1169,4 +1170,29 @@ void
BaseBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
response.__set_status(Status::NotSupported("warm_up_tablets is not
implemented").to_thrift());
}
+void
BaseBackendService::get_realtime_exec_status(TGetRealtimeExecStatusResponse&
response,
+ const
TGetRealtimeExecStatusRequest& request) {
+ if (!request.__isset.id) {
+ LOG_WARNING("Invalidate argument, id is empty");
+ response.__set_status(Status::InvalidArgument("id is
empty").to_thrift());
+ }
+
+ LOG_INFO("Getting realtime exec status of query {}", print_id(request.id));
+ std::unique_ptr<TReportExecStatusParams> report_exec_status_params =
+ std::make_unique<TReportExecStatusParams>();
+ Status st =
ExecEnv::GetInstance()->fragment_mgr()->get_realtime_exec_status(
+ request.id, report_exec_status_params.get());
+
+ if (!st.ok()) {
+ response.__set_status(st.to_thrift());
+ return;
+ }
+
+ report_exec_status_params->__set_query_id(TUniqueId());
+
+ response.__set_status(Status::OK().to_thrift());
+ response.__set_report_exec_status_params(*report_exec_status_params);
+ return;
+}
+
} // namespace doris
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index 20aaa96685a..b670b21221e 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -137,6 +137,9 @@ public:
void query_ingest_binlog(TQueryIngestBinlogResult& result,
const TQueryIngestBinlogRequest& request)
override;
+ void get_realtime_exec_status(TGetRealtimeExecStatusResponse& response,
+ const TGetRealtimeExecStatusRequest&
request) override;
+
////////////////////////////////////////////////////////////////////////////
// begin cloud backend functions
////////////////////////////////////////////////////////////////////////////
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index d1db6ff43e2..bdd51a1a8cc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -19,6 +19,7 @@ package org.apache.doris.common.profile;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.common.util.TimeUtils;
@@ -27,7 +28,10 @@ import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TDetailedReportParams;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TQueryProfile;
import org.apache.doris.thrift.TReportExecStatusParams;
+import org.apache.doris.thrift.TRuntimeProfileTree;
+import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.thrift.TUnit;
@@ -245,6 +249,81 @@ public class ExecutionProfile {
}
}
+ public Status updateProfile(TQueryProfile profile, TNetworkAddress
backendHBAddress) {
+ if (isPipelineXProfile) {
+ if (!profile.isSetFragmentIdToProfile()) {
+ return new Status(TStatusCode.INVALID_ARGUMENT,
"FragmentIdToProfile is not set");
+ }
+
+ for (Entry<Integer, List<TDetailedReportParams>> entry :
profile.getFragmentIdToProfile().entrySet()) {
+ int fragmentId = entry.getKey();
+ List<TDetailedReportParams> fragmentProfile = entry.getValue();
+ int pipelineIdx = 0;
+ List<RuntimeProfile> taskProfile = Lists.newArrayList();
+ for (TDetailedReportParams pipelineProfile : fragmentProfile) {
+ String name = "Pipeline :" + pipelineIdx + " "
+ + " (host=" + backendHBAddress + ")";
+ RuntimeProfile profileNode = new RuntimeProfile(name);
+ taskProfile.add(profileNode);
+ if (!pipelineProfile.isSetProfile()) {
+ return new Status(TStatusCode.INVALID_ARGUMENT,
"Profile is not set");
+ }
+
+ profileNode.update(pipelineProfile.profile);
+ pipelineIdx++;
+ fragmentProfiles.get(fragmentId).addChild(profileNode);
+ }
+ multiBeProfile.get(fragmentId).put(backendHBAddress,
taskProfile);
+ }
+ } else {
+ if (!profile.isSetInstanceProfiles() ||
!profile.isSetFragmentInstanceIds()) {
+ return new Status(TStatusCode.INVALID_ARGUMENT,
"InstanceIdToProfile is not set");
+ }
+
+ if (profile.fragment_instance_ids.size() !=
profile.instance_profiles.size()) {
+ return new Status(TStatusCode.INVALID_ARGUMENT,
"InstanceIdToProfile size is not equal");
+ }
+
+ for (int idx = 0; idx < profile.getFragmentInstanceIdsSize();
idx++) {
+ TUniqueId instanceId =
profile.getFragmentInstanceIds().get(idx);
+ TRuntimeProfileTree instanceProfile =
profile.getInstanceProfiles().get(idx);
+ if (instanceProfile == null) {
+ return new Status(TStatusCode.INVALID_ARGUMENT, "Profile
is not set");
+ }
+
+ PlanFragmentId fragmentId =
instanceIdToFragmentId.get(instanceId);
+ if (fragmentId == null) {
+ LOG.warn("Could not find related fragment for instance {}",
+ DebugUtil.printId(instanceId));
+ return new Status(TStatusCode.INVALID_ARGUMENT, "Could not
find related fragment");
+ }
+
+ // Do not use fragment id in params, because non-pipeline
engine will set it to -1
+ Map<TUniqueId, RuntimeProfile> instanceProfiles =
fragmentInstancesProfiles.get(fragmentId);
+ if (instanceProfiles == null) {
+ LOG.warn("Could not find related instances for fragment
{}", fragmentId);
+ return new Status(TStatusCode.INVALID_ARGUMENT, "Could not
find related instance");
+ }
+
+ RuntimeProfile curInstanceProfile =
instanceProfiles.get(instanceId);
+ if (curInstanceProfile == null) {
+ LOG.warn("Could not find related profile {}",
DebugUtil.printId(instanceId));
+ return new Status(TStatusCode.INVALID_ARGUMENT, "Could not
find related instance");
+ }
+
+ curInstanceProfile.update(instanceProfile);
+ }
+ }
+
+ if (profile.isSetLoadChannelProfiles()) {
+ for (TRuntimeProfileTree loadChannelProfile :
profile.getLoadChannelProfiles()) {
+ this.loadChannelProfile.update(loadChannelProfile);
+ }
+ }
+
+ return new Status(TStatusCode.OK, "Success");
+ }
+
public void updateProfile(TReportExecStatusParams params) {
Backend backend = null;
if (params.isSetBackendId()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index a4cd867a31e..eca02175fcf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -19,6 +19,7 @@ package org.apache.doris.qe;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
+import org.apache.doris.common.Status;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.ExecutionProfile;
@@ -26,7 +27,9 @@ import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ProfileManager;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
+import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TQueryProfile;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TReportExecStatusResult;
@@ -71,6 +74,27 @@ public final class QeProcessorImpl implements QeProcessor {
"profile-write-pool", true);
}
+ private Status processQueryProfile(TQueryProfile profile, TNetworkAddress
address) {
+ LOG.info("New profile processing API, query {}",
DebugUtil.printId(profile.query_id));
+
+ ExecutionProfile executionProfile =
ProfileManager.getInstance().getExecutionProfile(profile.query_id);
+ if (executionProfile == null) {
+ LOG.warn("Could not find execution profile with query id {}",
DebugUtil.printId(profile.query_id));
+ return new Status(TStatusCode.NOT_FOUND, "Could not find execution
profile with query id "
+ + DebugUtil.printId(profile.query_id));
+ }
+
+ // Update profile may cost a lot of time, use a seperate pool to deal
with it.
+ writeProfileExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ executionProfile.updateProfile(profile, address);
+ }
+ });
+
+ return Status.OK;
+ }
+
@Override
public Coordinator getCoordinator(TUniqueId queryId) {
QueryInfo queryInfo = coordinatorMap.get(queryId);
@@ -206,6 +230,15 @@ public final class QeProcessorImpl implements QeProcessor {
@Override
public TReportExecStatusResult reportExecStatus(TReportExecStatusParams
params, TNetworkAddress beAddr) {
+ if (params.isSetQueryProfile()) {
+ if (params.isSetBackendId()) {
+ Backend backend =
Env.getCurrentSystemInfo().getBackend(params.getBackendId());
+ if (backend != null) {
+ processQueryProfile(params.getQueryProfile(),
backend.getHeartbeatAddress());
+ }
+ }
+ }
+
if (params.isSetProfile() || params.isSetLoadChannelProfile()) {
LOG.info("ReportExecStatus(): fragment_instance_id={}, query
id={}, backend num: {}, ip: {}",
DebugUtil.printId(params.fragment_instance_id),
DebugUtil.printId(params.query_id),
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
index eb9ac858b3e..81d444a716a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
@@ -31,6 +31,8 @@ import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TExecPlanFragmentResult;
import org.apache.doris.thrift.TExportStatusResult;
import org.apache.doris.thrift.TExportTaskRequest;
+import org.apache.doris.thrift.TGetRealtimeExecStatusRequest;
+import org.apache.doris.thrift.TGetRealtimeExecStatusResponse;
import org.apache.doris.thrift.TGetTopNHotPartitionsRequest;
import org.apache.doris.thrift.TGetTopNHotPartitionsResponse;
import org.apache.doris.thrift.TIngestBinlogRequest;
@@ -273,6 +275,12 @@ public class GenericPoolTest {
public TWarmUpTabletsResponse warmUpTablets(TWarmUpTabletsRequest
request) throws TException {
return null;
}
+
+ @Override
+ public TGetRealtimeExecStatusResponse
getRealtimeExecStatus(TGetRealtimeExecStatusRequest request)
+ throws TException {
+ return null;
+ }
}
@Test
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index c705893c672..441595511dc 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -48,6 +48,8 @@ import org.apache.doris.thrift.TExportState;
import org.apache.doris.thrift.TExportStatusResult;
import org.apache.doris.thrift.TExportTaskRequest;
import org.apache.doris.thrift.TFinishTaskRequest;
+import org.apache.doris.thrift.TGetRealtimeExecStatusRequest;
+import org.apache.doris.thrift.TGetRealtimeExecStatusResponse;
import org.apache.doris.thrift.TGetTopNHotPartitionsRequest;
import org.apache.doris.thrift.TGetTopNHotPartitionsResponse;
import org.apache.doris.thrift.THeartbeatResult;
@@ -456,6 +458,12 @@ public class MockedBackendFactory {
throws TException {
return null;
}
+
+ @Override
+ public TGetRealtimeExecStatusResponse
getRealtimeExecStatus(TGetRealtimeExecStatusRequest request)
+ throws TException {
+ return null;
+ }
}
// The default Brpc service.
diff --git a/gensrc/thrift/BackendService.thrift
b/gensrc/thrift/BackendService.thrift
index fb45ed24809..69918985a16 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -24,6 +24,7 @@ include "PlanNodes.thrift"
include "AgentService.thrift"
include "PaloInternalService.thrift"
include "DorisExternalService.thrift"
+include "FrontendService.thrift"
struct TExportTaskRequest {
1: required PaloInternalService.TExecPlanFragmentParams params
@@ -319,6 +320,16 @@ struct TPublishTopicResult {
1: required Status.TStatus status
}
+struct TGetRealtimeExecStatusRequest {
+ // maybe query id or other unique id
+ 1: optional Types.TUniqueId id
+}
+
+struct TGetRealtimeExecStatusResponse {
+ 1: optional Status.TStatus status
+ 2: optional FrontendService.TReportExecStatusParams
report_exec_status_params
+}
+
service BackendService {
// Called by coord to start asynchronous execution of plan fragment in
backend.
// Returns as soon as all incoming data streams have been set up.
@@ -387,4 +398,6 @@ service BackendService {
TQueryIngestBinlogResult query_ingest_binlog(1: TQueryIngestBinlogRequest
query_ingest_binlog_request);
TPublishTopicResult publish_topic_info(1:TPublishTopicRequest
topic_request);
+
+ TGetRealtimeExecStatusResponse
get_realtime_exec_status(1:TGetRealtimeExecStatusRequest request);
}
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 61d34d04184..09cd2bf0be2 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -416,6 +416,20 @@ struct TReportWorkloadRuntimeStatusParams {
2: optional map<string, TQueryStatistics> query_statistics_map
}
+struct TQueryProfile {
+ 1: optional Types.TUniqueId query_id
+
+ 2: optional map<i32, list<TDetailedReportParams>> fragment_id_to_profile
+
+ // Types.TUniqueId should not be used as key in thrift map, so we use two
lists instead
+ // https://thrift.apache.org/docs/types#containers
+ 3: optional list<Types.TUniqueId> fragment_instance_ids
+ // Types.TUniqueId can not be used as key in thrift map, so we use two
lists instead
+ 4: optional list<RuntimeProfile.TRuntimeProfileTree> instance_profiles
+
+ 5: optional list<RuntimeProfile.TRuntimeProfileTree> load_channel_profiles
+}
+
// The results of an INSERT query, sent to the coordinator as part of
// TReportExecStatusParams
struct TReportExecStatusParams {
@@ -443,7 +457,7 @@ struct TReportExecStatusParams {
// cumulative profile
// required in V1
// Move to TDetailedReportParams for pipelineX
- 7: optional RuntimeProfile.TRuntimeProfileTree profile
+ 7: optional RuntimeProfile.TRuntimeProfileTree profile // to be deprecated
// New errors that have not been reported to the coordinator
// optional in V1
@@ -473,17 +487,19 @@ struct TReportExecStatusParams {
20: optional PaloInternalService.TQueryType query_type
// Move to TDetailedReportParams for pipelineX
- 21: optional RuntimeProfile.TRuntimeProfileTree loadChannelProfile
+ 21: optional RuntimeProfile.TRuntimeProfileTree loadChannelProfile // to be
deprecated
22: optional i32 finished_scan_ranges
- 23: optional list<TDetailedReportParams> detailed_report
+ 23: optional list<TDetailedReportParams> detailed_report // to be deprecated
24: optional TQueryStatistics query_statistics // deprecated
25: optional TReportWorkloadRuntimeStatusParams
report_workload_runtime_status
26: optional list<DataSinks.THivePartitionUpdate> hive_partition_updates
+
+ 27: optional TQueryProfile query_profile
}
struct TFeResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]