github-actions[bot] commented on code in PR #33383:
URL: https://github.com/apache/doris/pull/33383#discussion_r1555672271
##########
be/src/runtime/fragment_mgr.h:
##########
@@ -17,6 +17,7 @@
#pragma once
+#include <gen_cpp/FrontendService_types.h>
Review Comment:
warning: 'gen_cpp/FrontendService_types.h' file not found
[clang-diagnostic-error]
```cpp
#include <gen_cpp/FrontendService_types.h>
^
```
##########
be/src/runtime/plan_fragment_executor.cpp:
##########
@@ -644,4 +646,31 @@ void PlanFragmentExecutor::close() {
_closed = true;
}
+std::shared_ptr<TRuntimeProfileTree>
PlanFragmentExecutor::collect_realtime_query_profile() const {
Review Comment:
warning: method 'collect_realtime_query_profile' can be made static
[readability-convert-member-functions-to-static]
be/src/runtime/plan_fragment_executor.h:149:
```diff
- std::shared_ptr<TRuntimeProfileTree> collect_realtime_query_profile()
const;
+ static std::shared_ptr<TRuntimeProfileTree>
collect_realtime_query_profile() ;
```
```suggestion
std::shared_ptr<TRuntimeProfileTree>
PlanFragmentExecutor::collect_realtime_query_profile() {
```
##########
be/src/runtime/query_context.cpp:
##########
@@ -297,4 +319,161 @@
return Status::OK();
}
+void QueryContext::add_fragment_profile_x(
+ int fragment_id, const
std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profiles,
+ std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
+ if (pipeline_profiles.empty()) {
+ std::string msg = fmt::format("Add pipeline profile failed, query {},
fragment {}",
+ print_id(this->_query_id), fragment_id);
+ LOG_ERROR(msg);
+ DCHECK(false) << msg;
+ return;
+ }
+
+#ifndef NDEBUG
+ for (const auto& p : pipeline_profiles) {
+ DCHECK(p != nullptr) << fmt::format("Add pipeline profile failed,
query {}, fragment {}",
+ print_id(this->_query_id),
fragment_id);
+ }
+#endif
+
+ std::lock_guard<std::mutex> l(_profile_mutex);
+ LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline
profile count {} ",
+ print_id(this->_query_id), fragment_id, pipeline_profiles.size());
+
+ _profile_map_x.insert(std::make_pair(fragment_id, pipeline_profiles));
+
+ if (load_channel_profile != nullptr) {
+ _load_channel_profile_map_x.insert(std::make_pair(fragment_id,
load_channel_profile));
+ }
+}
+
+void QueryContext::add_instance_profile(const TUniqueId& instance_id,
+ std::shared_ptr<TRuntimeProfileTree>
profile,
+ std::shared_ptr<TRuntimeProfileTree>
load_channel_profile) {
+ DCHECK(profile != nullptr) << print_id(instance_id);
+
+ std::lock_guard<std::mutex> lg(_profile_mutex);
+ _profile_map.insert(std::make_pair(instance_id, profile));
+ if (load_channel_profile != nullptr) {
+ _load_channel_profile_map.insert(std::make_pair(instance_id,
load_channel_profile));
+ }
+}
+
+void QueryContext::_report_query_profile() {
+ _report_query_profile_x();
+ _report_query_profile_non_pipeline();
+}
+
+void QueryContext::_report_query_profile_non_pipeline() {
Review Comment:
warning: method '_report_query_profile_non_pipeline' can be made const
[readability-make-member-function-const]
```suggestion
void QueryContext::_report_query_profile_non_pipeline() const {
```
be/src/runtime/query_context.h:387:
```diff
- void _report_query_profile_non_pipeline();
+ void _report_query_profile_non_pipeline() const;
```
##########
be/src/runtime/plan_fragment_executor.cpp:
##########
@@ -644,4 +646,31 @@
_closed = true;
}
+std::shared_ptr<TRuntimeProfileTree>
PlanFragmentExecutor::collect_realtime_query_profile() const {
+ std::shared_ptr<TRuntimeProfileTree> res =
std::make_shared<TRuntimeProfileTree>();
+
+ if (_runtime_state != nullptr) {
+ _runtime_state->runtime_profile()->compute_time_in_profile();
+ _runtime_state->runtime_profile()->to_thrift(res.get());
+ } else {
+ return nullptr;
+ }
+
+ return res;
+}
+
+std::shared_ptr<TRuntimeProfileTree>
PlanFragmentExecutor::collect_realtime_load_channel_profile()
Review Comment:
warning: method 'collect_realtime_load_channel_profile' can be made static
[readability-convert-member-functions-to-static]
be/src/runtime/plan_fragment_executor.h:150:
```diff
- std::shared_ptr<TRuntimeProfileTree>
collect_realtime_load_channel_profile() const;
+ static std::shared_ptr<TRuntimeProfileTree>
collect_realtime_load_channel_profile() ;
```
be/src/runtime/plan_fragment_executor.cpp:662:
```diff
- const {
+ {
```
##########
be/src/runtime/runtime_query_statistics_mgr.cpp:
##########
@@ -17,14 +17,432 @@
#include "runtime/runtime_query_statistics_mgr.h"
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
+#include <gen_cpp/Status_types.h>
+#include <gen_cpp/Types_types.h>
+#include <thrift/TApplicationException.h>
+
+#include <condition_variable>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <random>
+#include <shared_mutex>
+#include <string>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+
+#include "common/logging.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
+#include "runtime/query_context.h"
+#include "service/backend_options.h"
#include "util/debug_util.h"
+#include "util/hash_util.hpp"
#include "util/time.h"
+#include "util/uid_util.h"
#include "vec/core/block.h"
namespace doris {
+static Status _do_report_exec_stats_rpc(const TNetworkAddress& coor_addr,
+ const TReportExecStatusParams& req,
+ TReportExecStatusResult& res) {
+ Status client_status;
+ FrontendServiceConnection
rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), coor_addr,
+ &client_status);
+ if (!client_status.ok()) {
+ LOG_WARNING(
+ "could not get client rpc client of {} when reporting
profiles, reason is {}, "
+ "not reporting, profile will be lost",
+ PrintThriftNetworkAddress(coor_addr),
client_status.to_string());
+ return Status::RpcError("Client rpc client failed");
+ }
+
+ try {
+ try {
+ rpc_client->reportExecStatus(res, req);
+ } catch (const apache::thrift::transport::TTransportException& e) {
+ LOG_WARNING("Transport exception from {}, reason: {}, reopening",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
+ if (!client_status.ok()) {
+ LOG_WARNING("Reopen failed, reason: {}",
client_status.to_string());
+ return Status::RpcError("Open rpc client failed");
+ }
+
+ rpc_client->reportExecStatus(res, req);
+ }
+ } catch (apache::thrift::TApplicationException& e) {
+ if (e.getType() == e.UNKNOWN_METHOD) {
+ LOG_WARNING(
+ "Failed to send statistics to {} due to {}, usually
because the frontend "
+ "is not upgraded, check the version",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ } else {
+ LOG_WARNING(
+ "Failed to send statistics to {}, reason: {}, you can see
fe log for "
+ "details.",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ }
+ return Status::RpcError("Send stats failed");
+ } catch (std::exception& e) {
+ LOG_WARNING("Failed to send statistics to {}, reason: {}, you can see
fe log for details.",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ return Status::RpcError("Send stats failed");
+ }
+
+ return Status::OK();
+}
+
+TReportExecStatusParams
RuntimeQueryStatiticsMgr::create_report_exec_status_params_x(
+ const TUniqueId& query_id,
+ const std::unordered_map<int32,
std::vector<std::shared_ptr<TRuntimeProfileTree>>>&
+ fragment_id_to_profile,
+ const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
load_channel_profiles) {
+ TQueryProfile profile;
+ profile.__set_query_id(query_id);
+
+ std::map<int32_t, std::vector<TDetailedReportParams>>
fragment_id_to_profile_req;
+
+ for (const auto& entry : fragment_id_to_profile) {
+ int32_t fragment_id = entry.first;
+ const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
fragment_profile = entry.second;
+ std::vector<TDetailedReportParams> detailed_params;
+
+ for (auto pipeline_profile : fragment_profile) {
+ if (pipeline_profile == nullptr) {
+ auto msg = fmt::format("Register fragment profile {} {}
failed, profile is null",
+ print_id(query_id), fragment_id);
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+
+ TDetailedReportParams tmp;
+ tmp.__set_profile(*pipeline_profile);
+ // tmp.fragment_instance_id is not needed for pipeline x
+ detailed_params.push_back(tmp);
+ }
+
+ fragment_id_to_profile_req.insert(std::make_pair(fragment_id,
detailed_params));
+ }
+
+ if (fragment_id_to_profile_req.size() == 0) {
+ LOG_WARNING("No fragment profile found for query {}",
print_id(query_id));
+ }
+
+ profile.__set_fragment_id_to_profile(fragment_id_to_profile_req);
+
+ std::vector<TRuntimeProfileTree> load_channel_profiles_req;
+ for (auto load_channel_profile : load_channel_profiles) {
+ if (load_channel_profile == nullptr) {
+ auto msg = fmt::format(
+ "Register fragment profile {} {} failed, load channel
profile is null",
+ print_id(query_id), -1);
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+
+ load_channel_profiles_req.push_back(*load_channel_profile);
+ }
+
+ if (load_channel_profiles_req.size() > 0) {
+ profile.__set_load_channel_profiles(load_channel_profiles_req);
+ }
+
+ TReportExecStatusParams req;
+ req.__set_query_profile(profile);
+ req.__set_query_id(TUniqueId());
+
+ return req;
+}
+
+TReportExecStatusParams
RuntimeQueryStatiticsMgr::create_report_exec_status_params_non_pipeline(
+ const TUniqueId& query_id,
+ const std::unordered_map<TUniqueId,
std::shared_ptr<TRuntimeProfileTree>>&
+ instance_id_to_profile,
+ const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
load_channel_profile) {
+ TQueryProfile profile;
+ std::vector<TUniqueId> fragment_instance_ids;
+ std::vector<TRuntimeProfileTree> instance_profiles;
+ std::vector<TRuntimeProfileTree> load_channel_profiles;
+
+ for (auto entry : instance_id_to_profile) {
+ TUniqueId instance_id = entry.first;
+ std::shared_ptr<TRuntimeProfileTree> profile = entry.second;
+
+ if (profile == nullptr) {
+ auto msg = fmt::format("Register instance profile {} {} failed,
profile is null",
+ print_id(query_id), print_id(instance_id));
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+
+ fragment_instance_ids.push_back(instance_id);
+ instance_profiles.push_back(*profile);
+ }
+
+ profile.__set_query_id(query_id);
+ profile.__set_fragment_instance_ids(fragment_instance_ids);
+ profile.__set_instance_profiles(instance_profiles);
+ profile.__set_load_channel_profiles(load_channel_profiles);
+
+ TReportExecStatusParams res;
+ res.__set_query_profile(profile);
+ res.__set_query_id(TUniqueId());
+ return res;
+}
+
+void RuntimeQueryStatiticsMgr::start_report_thread() {
+ if (started.load()) {
+ DCHECK(false) << "report thread has been started";
+ LOG_ERROR("report thread has been started");
+ return;
+ }
+
+ started.store(true);
+
+ for (size_t i = 0; i < config::report_exec_status_thread_num; ++i) {
+
this->_report_profile_threads.emplace_back(std::make_unique<std::thread>(
+ &RuntimeQueryStatiticsMgr::report_query_profiles_thread,
this));
+ }
+}
+
+void RuntimeQueryStatiticsMgr::report_query_profiles_thread() {
+ while (true) {
+ {
+ std::unique_lock<std::mutex> lock(_report_profile_mutex);
+
+ while (_query_profile_map.empty() && _profile_map_x.empty() &&
+ !_report_profile_thread_stop) {
+ _report_profile_cv.wait(lock);
+ }
+ }
+
+ _report_query_profiles_function();
+
+ {
+ std::lock_guard<std::mutex> lg(_report_profile_mutex);
+
+ if (_report_profile_thread_stop) {
+ LOG_INFO("Report profile thread stopped");
+ return;
+ }
+ }
+ }
+}
+
+void RuntimeQueryStatiticsMgr::trigger_report_profile() {
+ std::unique_lock<std::mutex> lock(_report_profile_mutex);
+ _report_profile_cv.notify_one();
+}
+
+void RuntimeQueryStatiticsMgr::stop_report_thread() {
+ if (!started) {
+ return;
+ }
+
+ {
+ std::unique_lock<std::mutex> lock(_report_profile_mutex);
+ _report_profile_thread_stop = true;
+ LOG_INFO("All report threads are going to stop");
+ _report_profile_cv.notify_all();
+ }
+
+ for (const auto& thread : _report_profile_threads) {
+ thread->join();
+ }
+
+ LOG_INFO("All report threads stopped");
+}
+
+void RuntimeQueryStatiticsMgr::register_instance_profile(
+ const TUniqueId& query_id, const TNetworkAddress& coor_addr, const
TUniqueId& instance_id,
+ std::shared_ptr<TRuntimeProfileTree> instance_profile,
+ std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
+ if (instance_profile == nullptr) {
+ auto msg = fmt::format("Register instance profile {} {} failed,
profile is null",
+ print_id(query_id), print_id(instance_id));
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ return;
+ }
+
+ std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
+
+ if (!_query_profile_map.contains(query_id)) {
+ _query_profile_map[query_id] = std::make_tuple(
+ coor_addr, std::unordered_map<TUniqueId,
std::shared_ptr<TRuntimeProfileTree>>());
+ }
+
+ std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>&
instance_profile_map =
+ std::get<1>(_query_profile_map[query_id]);
+ instance_profile_map.insert(std::make_pair(instance_id, instance_profile));
+
+ if (load_channel_profile != nullptr) {
+ _load_channel_profile_map[instance_id] = load_channel_profile;
+ }
+
+ LOG_INFO("Register instance profile {} {}", print_id(query_id),
print_id(instance_id));
+}
+
+void RuntimeQueryStatiticsMgr::register_fragment_profile_x(
Review Comment:
warning: method 'register_fragment_profile_x' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static void RuntimeQueryStatiticsMgr::register_fragment_profile_x(
```
##########
be/src/runtime/runtime_query_statistics_mgr.cpp:
##########
@@ -17,14 +17,432 @@
#include "runtime/runtime_query_statistics_mgr.h"
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
+#include <gen_cpp/Status_types.h>
+#include <gen_cpp/Types_types.h>
+#include <thrift/TApplicationException.h>
+
+#include <condition_variable>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <random>
+#include <shared_mutex>
+#include <string>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+
+#include "common/logging.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
+#include "runtime/query_context.h"
+#include "service/backend_options.h"
#include "util/debug_util.h"
+#include "util/hash_util.hpp"
#include "util/time.h"
+#include "util/uid_util.h"
#include "vec/core/block.h"
namespace doris {
+static Status _do_report_exec_stats_rpc(const TNetworkAddress& coor_addr,
+ const TReportExecStatusParams& req,
+ TReportExecStatusResult& res) {
+ Status client_status;
+ FrontendServiceConnection
rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), coor_addr,
+ &client_status);
+ if (!client_status.ok()) {
+ LOG_WARNING(
+ "could not get client rpc client of {} when reporting
profiles, reason is {}, "
+ "not reporting, profile will be lost",
+ PrintThriftNetworkAddress(coor_addr),
client_status.to_string());
+ return Status::RpcError("Client rpc client failed");
+ }
+
+ try {
+ try {
+ rpc_client->reportExecStatus(res, req);
+ } catch (const apache::thrift::transport::TTransportException& e) {
+ LOG_WARNING("Transport exception from {}, reason: {}, reopening",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
+ if (!client_status.ok()) {
+ LOG_WARNING("Reopen failed, reason: {}",
client_status.to_string());
+ return Status::RpcError("Open rpc client failed");
+ }
+
+ rpc_client->reportExecStatus(res, req);
+ }
+ } catch (apache::thrift::TApplicationException& e) {
+ if (e.getType() == e.UNKNOWN_METHOD) {
+ LOG_WARNING(
+ "Failed to send statistics to {} due to {}, usually
because the frontend "
+ "is not upgraded, check the version",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ } else {
+ LOG_WARNING(
+ "Failed to send statistics to {}, reason: {}, you can see
fe log for "
+ "details.",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ }
+ return Status::RpcError("Send stats failed");
+ } catch (std::exception& e) {
+ LOG_WARNING("Failed to send statistics to {}, reason: {}, you can see
fe log for details.",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ return Status::RpcError("Send stats failed");
+ }
+
+ return Status::OK();
+}
+
+TReportExecStatusParams
RuntimeQueryStatiticsMgr::create_report_exec_status_params_x(
+ const TUniqueId& query_id,
+ const std::unordered_map<int32,
std::vector<std::shared_ptr<TRuntimeProfileTree>>>&
+ fragment_id_to_profile,
+ const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
load_channel_profiles) {
+ TQueryProfile profile;
+ profile.__set_query_id(query_id);
+
+ std::map<int32_t, std::vector<TDetailedReportParams>>
fragment_id_to_profile_req;
+
+ for (const auto& entry : fragment_id_to_profile) {
+ int32_t fragment_id = entry.first;
+ const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
fragment_profile = entry.second;
+ std::vector<TDetailedReportParams> detailed_params;
+
+ for (auto pipeline_profile : fragment_profile) {
+ if (pipeline_profile == nullptr) {
+ auto msg = fmt::format("Register fragment profile {} {}
failed, profile is null",
+ print_id(query_id), fragment_id);
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+
+ TDetailedReportParams tmp;
+ tmp.__set_profile(*pipeline_profile);
+ // tmp.fragment_instance_id is not needed for pipeline x
+ detailed_params.push_back(tmp);
+ }
+
+ fragment_id_to_profile_req.insert(std::make_pair(fragment_id,
detailed_params));
+ }
+
+ if (fragment_id_to_profile_req.size() == 0) {
+ LOG_WARNING("No fragment profile found for query {}",
print_id(query_id));
+ }
+
+ profile.__set_fragment_id_to_profile(fragment_id_to_profile_req);
+
+ std::vector<TRuntimeProfileTree> load_channel_profiles_req;
+ for (auto load_channel_profile : load_channel_profiles) {
+ if (load_channel_profile == nullptr) {
+ auto msg = fmt::format(
+ "Register fragment profile {} {} failed, load channel
profile is null",
+ print_id(query_id), -1);
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+
+ load_channel_profiles_req.push_back(*load_channel_profile);
+ }
+
+ if (load_channel_profiles_req.size() > 0) {
+ profile.__set_load_channel_profiles(load_channel_profiles_req);
+ }
+
+ TReportExecStatusParams req;
+ req.__set_query_profile(profile);
+ req.__set_query_id(TUniqueId());
+
+ return req;
+}
+
+TReportExecStatusParams
RuntimeQueryStatiticsMgr::create_report_exec_status_params_non_pipeline(
+ const TUniqueId& query_id,
+ const std::unordered_map<TUniqueId,
std::shared_ptr<TRuntimeProfileTree>>&
+ instance_id_to_profile,
+ const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
load_channel_profile) {
+ TQueryProfile profile;
+ std::vector<TUniqueId> fragment_instance_ids;
+ std::vector<TRuntimeProfileTree> instance_profiles;
+ std::vector<TRuntimeProfileTree> load_channel_profiles;
+
+ for (auto entry : instance_id_to_profile) {
+ TUniqueId instance_id = entry.first;
+ std::shared_ptr<TRuntimeProfileTree> profile = entry.second;
+
+ if (profile == nullptr) {
+ auto msg = fmt::format("Register instance profile {} {} failed,
profile is null",
+ print_id(query_id), print_id(instance_id));
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+
+ fragment_instance_ids.push_back(instance_id);
+ instance_profiles.push_back(*profile);
+ }
+
+ profile.__set_query_id(query_id);
+ profile.__set_fragment_instance_ids(fragment_instance_ids);
+ profile.__set_instance_profiles(instance_profiles);
+ profile.__set_load_channel_profiles(load_channel_profiles);
+
+ TReportExecStatusParams res;
+ res.__set_query_profile(profile);
+ res.__set_query_id(TUniqueId());
+ return res;
+}
+
+void RuntimeQueryStatiticsMgr::start_report_thread() {
+ if (started.load()) {
+ DCHECK(false) << "report thread has been started";
+ LOG_ERROR("report thread has been started");
+ return;
+ }
+
+ started.store(true);
+
+ for (size_t i = 0; i < config::report_exec_status_thread_num; ++i) {
+
this->_report_profile_threads.emplace_back(std::make_unique<std::thread>(
+ &RuntimeQueryStatiticsMgr::report_query_profiles_thread,
this));
+ }
+}
+
+void RuntimeQueryStatiticsMgr::report_query_profiles_thread() {
+ while (true) {
+ {
+ std::unique_lock<std::mutex> lock(_report_profile_mutex);
+
+ while (_query_profile_map.empty() && _profile_map_x.empty() &&
+ !_report_profile_thread_stop) {
+ _report_profile_cv.wait(lock);
+ }
+ }
+
+ _report_query_profiles_function();
+
+ {
+ std::lock_guard<std::mutex> lg(_report_profile_mutex);
+
+ if (_report_profile_thread_stop) {
+ LOG_INFO("Report profile thread stopped");
+ return;
+ }
+ }
+ }
+}
+
+void RuntimeQueryStatiticsMgr::trigger_report_profile() {
+ std::unique_lock<std::mutex> lock(_report_profile_mutex);
+ _report_profile_cv.notify_one();
+}
+
+void RuntimeQueryStatiticsMgr::stop_report_thread() {
+ if (!started) {
+ return;
+ }
+
+ {
+ std::unique_lock<std::mutex> lock(_report_profile_mutex);
+ _report_profile_thread_stop = true;
+ LOG_INFO("All report threads are going to stop");
+ _report_profile_cv.notify_all();
+ }
+
+ for (const auto& thread : _report_profile_threads) {
+ thread->join();
+ }
+
+ LOG_INFO("All report threads stopped");
+}
+
+void RuntimeQueryStatiticsMgr::register_instance_profile(
+ const TUniqueId& query_id, const TNetworkAddress& coor_addr, const
TUniqueId& instance_id,
+ std::shared_ptr<TRuntimeProfileTree> instance_profile,
+ std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
+ if (instance_profile == nullptr) {
+ auto msg = fmt::format("Register instance profile {} {} failed,
profile is null",
+ print_id(query_id), print_id(instance_id));
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ return;
+ }
+
+ std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
+
+ if (!_query_profile_map.contains(query_id)) {
+ _query_profile_map[query_id] = std::make_tuple(
+ coor_addr, std::unordered_map<TUniqueId,
std::shared_ptr<TRuntimeProfileTree>>());
+ }
+
+ std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>&
instance_profile_map =
+ std::get<1>(_query_profile_map[query_id]);
+ instance_profile_map.insert(std::make_pair(instance_id, instance_profile));
+
+ if (load_channel_profile != nullptr) {
+ _load_channel_profile_map[instance_id] = load_channel_profile;
+ }
+
+ LOG_INFO("Register instance profile {} {}", print_id(query_id),
print_id(instance_id));
+}
+
+void RuntimeQueryStatiticsMgr::register_fragment_profile_x(
+ const TUniqueId& query_id, const TNetworkAddress& coor_addr, int32_t
fragment_id,
+ std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles,
+ std::shared_ptr<TRuntimeProfileTree> load_channel_profile_x) {
+ for (const auto& p : p_profiles) {
+ if (p == nullptr) {
+ auto msg = fmt::format("Register fragment profile {} {} failed,
profile is null",
+ print_id(query_id), fragment_id);
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ return;
+ }
+ }
+
+ std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
+
+ if (!_profile_map_x.contains(query_id)) {
+ _profile_map_x[query_id] = std::make_tuple(
+ coor_addr,
+ std::unordered_map<int,
std::vector<std::shared_ptr<TRuntimeProfileTree>>>());
+ }
+
+ std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>&
+ fragment_profile_map = std::get<1>(_profile_map_x[query_id]);
+ fragment_profile_map.insert(std::make_pair(fragment_id, p_profiles));
+
+ if (load_channel_profile_x != nullptr) {
+ _load_channel_profile_map_x[std::make_pair(query_id, fragment_id)] =
load_channel_profile_x;
+ }
+
+ LOG_INFO("register x profile done {}, fragment {}, profiles {}",
print_id(query_id),
+ fragment_id, p_profiles.size());
+}
+
+void RuntimeQueryStatiticsMgr::_report_query_profiles_non_pipeline() {
Review Comment:
warning: method '_report_query_profiles_non_pipeline' can be made static
[readability-convert-member-functions-to-static]
be/src/runtime/runtime_query_statistics_mgr.h:130:
```diff
- void _report_query_profiles_non_pipeline();
+ static void _report_query_profiles_non_pipeline();
```
##########
be/src/runtime/query_context.cpp:
##########
@@ -297,4 +319,161 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr&
tg) {
return Status::OK();
}
+void QueryContext::add_fragment_profile_x(
Review Comment:
warning: method 'add_fragment_profile_x' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static void QueryContext::add_fragment_profile_x(
```
##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1609,4 +1620,59 @@ void
FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* query_i
}
}
+Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id,
Review Comment:
warning: method 'get_realtime_exec_status' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static Status FragmentMgr::get_realtime_exec_status(const TUniqueId&
query_id,
```
##########
be/src/runtime/query_context.cpp:
##########
@@ -297,4 +319,161 @@
return Status::OK();
}
+void QueryContext::add_fragment_profile_x(
+ int fragment_id, const
std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profiles,
+ std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
+ if (pipeline_profiles.empty()) {
+ std::string msg = fmt::format("Add pipeline profile failed, query {},
fragment {}",
+ print_id(this->_query_id), fragment_id);
+ LOG_ERROR(msg);
+ DCHECK(false) << msg;
+ return;
+ }
+
+#ifndef NDEBUG
+ for (const auto& p : pipeline_profiles) {
+ DCHECK(p != nullptr) << fmt::format("Add pipeline profile failed,
query {}, fragment {}",
+ print_id(this->_query_id),
fragment_id);
+ }
+#endif
+
+ std::lock_guard<std::mutex> l(_profile_mutex);
+ LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline
profile count {} ",
+ print_id(this->_query_id), fragment_id, pipeline_profiles.size());
+
+ _profile_map_x.insert(std::make_pair(fragment_id, pipeline_profiles));
+
+ if (load_channel_profile != nullptr) {
+ _load_channel_profile_map_x.insert(std::make_pair(fragment_id,
load_channel_profile));
+ }
+}
+
+void QueryContext::add_instance_profile(const TUniqueId& instance_id,
+ std::shared_ptr<TRuntimeProfileTree>
profile,
+ std::shared_ptr<TRuntimeProfileTree>
load_channel_profile) {
+ DCHECK(profile != nullptr) << print_id(instance_id);
+
+ std::lock_guard<std::mutex> lg(_profile_mutex);
+ _profile_map.insert(std::make_pair(instance_id, profile));
+ if (load_channel_profile != nullptr) {
+ _load_channel_profile_map.insert(std::make_pair(instance_id,
load_channel_profile));
+ }
+}
+
+void QueryContext::_report_query_profile() {
+ _report_query_profile_x();
+ _report_query_profile_non_pipeline();
+}
+
+void QueryContext::_report_query_profile_non_pipeline() {
+ if (enable_pipeline_exec()) {
+ return;
+ }
+
+ std::lock_guard<std::mutex> lg(_profile_mutex);
+ LOG_INFO("Query {}, register query profile, instance profile count {}",
print_id(_query_id),
+ _profile_map.size());
+
+ for (auto& [instance_id, instance_profile] : _profile_map) {
+ std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr;
+ if (_load_channel_profile_map.contains(instance_id)) {
+ load_channel_profile = _load_channel_profile_map[instance_id];
+ }
+
+
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_instance_profile(
+ _query_id, this->coord_addr, instance_id, instance_profile,
load_channel_profile);
+ }
+
+
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_report_profile();
+}
+
+void QueryContext::_report_query_profile_x() {
Review Comment:
warning: method '_report_query_profile_x' can be made const
[readability-make-member-function-const]
```suggestion
void QueryContext::_report_query_profile_x() const {
```
be/src/runtime/query_context.h:388:
```diff
- void _report_query_profile_x();
+ void _report_query_profile_x() const;
```
##########
be/src/runtime/plan_fragment_executor.h:
##########
@@ -21,6 +21,7 @@
#pragma once
#include <gen_cpp/PaloInternalService_types.h>
Review Comment:
warning: 'gen_cpp/PaloInternalService_types.h' file not found
[clang-diagnostic-error]
```cpp
#include <gen_cpp/PaloInternalService_types.h>
^
```
##########
be/src/runtime/query_context.h:
##########
@@ -18,11 +18,14 @@
#pragma once
#include <gen_cpp/PaloInternalService_types.h>
Review Comment:
warning: 'gen_cpp/PaloInternalService_types.h' file not found
[clang-diagnostic-error]
```cpp
#include <gen_cpp/PaloInternalService_types.h>
^
```
##########
be/src/runtime/query_context.cpp:
##########
@@ -297,4 +319,161 @@
return Status::OK();
}
+void QueryContext::add_fragment_profile_x(
+ int fragment_id, const
std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profiles,
+ std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
+ if (pipeline_profiles.empty()) {
+ std::string msg = fmt::format("Add pipeline profile failed, query {},
fragment {}",
+ print_id(this->_query_id), fragment_id);
+ LOG_ERROR(msg);
+ DCHECK(false) << msg;
+ return;
+ }
+
+#ifndef NDEBUG
+ for (const auto& p : pipeline_profiles) {
+ DCHECK(p != nullptr) << fmt::format("Add pipeline profile failed,
query {}, fragment {}",
+ print_id(this->_query_id),
fragment_id);
+ }
+#endif
+
+ std::lock_guard<std::mutex> l(_profile_mutex);
+ LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline
profile count {} ",
+ print_id(this->_query_id), fragment_id, pipeline_profiles.size());
+
+ _profile_map_x.insert(std::make_pair(fragment_id, pipeline_profiles));
+
+ if (load_channel_profile != nullptr) {
+ _load_channel_profile_map_x.insert(std::make_pair(fragment_id,
load_channel_profile));
+ }
+}
+
+void QueryContext::add_instance_profile(const TUniqueId& instance_id,
Review Comment:
warning: method 'add_instance_profile' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static void QueryContext::add_instance_profile(const TUniqueId& instance_id,
```
##########
be/src/runtime/runtime_query_statistics_mgr.cpp:
##########
@@ -17,14 +17,432 @@
#include "runtime/runtime_query_statistics_mgr.h"
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
+#include <gen_cpp/Status_types.h>
+#include <gen_cpp/Types_types.h>
+#include <thrift/TApplicationException.h>
+
+#include <condition_variable>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <random>
+#include <shared_mutex>
+#include <string>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+
+#include "common/logging.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
+#include "runtime/query_context.h"
+#include "service/backend_options.h"
#include "util/debug_util.h"
+#include "util/hash_util.hpp"
#include "util/time.h"
+#include "util/uid_util.h"
#include "vec/core/block.h"
namespace doris {
+static Status _do_report_exec_stats_rpc(const TNetworkAddress& coor_addr,
+ const TReportExecStatusParams& req,
+ TReportExecStatusResult& res) {
+ Status client_status;
+ FrontendServiceConnection
rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), coor_addr,
+ &client_status);
+ if (!client_status.ok()) {
+ LOG_WARNING(
+ "could not get client rpc client of {} when reporting
profiles, reason is {}, "
+ "not reporting, profile will be lost",
+ PrintThriftNetworkAddress(coor_addr),
client_status.to_string());
+ return Status::RpcError("Client rpc client failed");
+ }
+
+ try {
+ try {
+ rpc_client->reportExecStatus(res, req);
+ } catch (const apache::thrift::transport::TTransportException& e) {
+ LOG_WARNING("Transport exception from {}, reason: {}, reopening",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
+ if (!client_status.ok()) {
+ LOG_WARNING("Reopen failed, reason: {}",
client_status.to_string());
+ return Status::RpcError("Open rpc client failed");
+ }
+
+ rpc_client->reportExecStatus(res, req);
+ }
+ } catch (apache::thrift::TApplicationException& e) {
+ if (e.getType() == e.UNKNOWN_METHOD) {
+ LOG_WARNING(
+ "Failed to send statistics to {} due to {}, usually
because the frontend "
+ "is not upgraded, check the version",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ } else {
+ LOG_WARNING(
+ "Failed to send statistics to {}, reason: {}, you can see
fe log for "
+ "details.",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ }
+ return Status::RpcError("Send stats failed");
+ } catch (std::exception& e) {
+ LOG_WARNING("Failed to send statistics to {}, reason: {}, you can see
fe log for details.",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ return Status::RpcError("Send stats failed");
+ }
+
+ return Status::OK();
+}
+
+TReportExecStatusParams
RuntimeQueryStatiticsMgr::create_report_exec_status_params_x(
+ const TUniqueId& query_id,
+ const std::unordered_map<int32,
std::vector<std::shared_ptr<TRuntimeProfileTree>>>&
+ fragment_id_to_profile,
+ const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
load_channel_profiles) {
+ TQueryProfile profile;
+ profile.__set_query_id(query_id);
+
+ std::map<int32_t, std::vector<TDetailedReportParams>>
fragment_id_to_profile_req;
+
+ for (const auto& entry : fragment_id_to_profile) {
+ int32_t fragment_id = entry.first;
+ const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
fragment_profile = entry.second;
+ std::vector<TDetailedReportParams> detailed_params;
+
+ for (auto pipeline_profile : fragment_profile) {
+ if (pipeline_profile == nullptr) {
+ auto msg = fmt::format("Register fragment profile {} {}
failed, profile is null",
+ print_id(query_id), fragment_id);
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+
+ TDetailedReportParams tmp;
+ tmp.__set_profile(*pipeline_profile);
+ // tmp.fragment_instance_id is not needed for pipeline x
+ detailed_params.push_back(tmp);
+ }
+
+ fragment_id_to_profile_req.insert(std::make_pair(fragment_id,
detailed_params));
+ }
+
+ if (fragment_id_to_profile_req.size() == 0) {
+ LOG_WARNING("No fragment profile found for query {}",
print_id(query_id));
+ }
+
+ profile.__set_fragment_id_to_profile(fragment_id_to_profile_req);
+
+ std::vector<TRuntimeProfileTree> load_channel_profiles_req;
+ for (auto load_channel_profile : load_channel_profiles) {
+ if (load_channel_profile == nullptr) {
+ auto msg = fmt::format(
+ "Register fragment profile {} {} failed, load channel
profile is null",
+ print_id(query_id), -1);
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+
+ load_channel_profiles_req.push_back(*load_channel_profile);
+ }
+
+ if (load_channel_profiles_req.size() > 0) {
+ profile.__set_load_channel_profiles(load_channel_profiles_req);
+ }
+
+ TReportExecStatusParams req;
+ req.__set_query_profile(profile);
+ req.__set_query_id(TUniqueId());
+
+ return req;
+}
+
+TReportExecStatusParams
RuntimeQueryStatiticsMgr::create_report_exec_status_params_non_pipeline(
+ const TUniqueId& query_id,
+ const std::unordered_map<TUniqueId,
std::shared_ptr<TRuntimeProfileTree>>&
+ instance_id_to_profile,
+ const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
load_channel_profile) {
+ TQueryProfile profile;
+ std::vector<TUniqueId> fragment_instance_ids;
+ std::vector<TRuntimeProfileTree> instance_profiles;
+ std::vector<TRuntimeProfileTree> load_channel_profiles;
+
+ for (auto entry : instance_id_to_profile) {
+ TUniqueId instance_id = entry.first;
+ std::shared_ptr<TRuntimeProfileTree> profile = entry.second;
+
+ if (profile == nullptr) {
+ auto msg = fmt::format("Register instance profile {} {} failed,
profile is null",
+ print_id(query_id), print_id(instance_id));
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+
+ fragment_instance_ids.push_back(instance_id);
+ instance_profiles.push_back(*profile);
+ }
+
+ profile.__set_query_id(query_id);
+ profile.__set_fragment_instance_ids(fragment_instance_ids);
+ profile.__set_instance_profiles(instance_profiles);
+ profile.__set_load_channel_profiles(load_channel_profiles);
+
+ TReportExecStatusParams res;
+ res.__set_query_profile(profile);
+ res.__set_query_id(TUniqueId());
+ return res;
+}
+
+void RuntimeQueryStatiticsMgr::start_report_thread() {
+ if (started.load()) {
+ DCHECK(false) << "report thread has been started";
+ LOG_ERROR("report thread has been started");
+ return;
+ }
+
+ started.store(true);
+
+ for (size_t i = 0; i < config::report_exec_status_thread_num; ++i) {
+
this->_report_profile_threads.emplace_back(std::make_unique<std::thread>(
+ &RuntimeQueryStatiticsMgr::report_query_profiles_thread,
this));
+ }
+}
+
+void RuntimeQueryStatiticsMgr::report_query_profiles_thread() {
+ while (true) {
+ {
+ std::unique_lock<std::mutex> lock(_report_profile_mutex);
+
+ while (_query_profile_map.empty() && _profile_map_x.empty() &&
+ !_report_profile_thread_stop) {
+ _report_profile_cv.wait(lock);
+ }
+ }
+
+ _report_query_profiles_function();
+
+ {
+ std::lock_guard<std::mutex> lg(_report_profile_mutex);
+
+ if (_report_profile_thread_stop) {
+ LOG_INFO("Report profile thread stopped");
+ return;
+ }
+ }
+ }
+}
+
+void RuntimeQueryStatiticsMgr::trigger_report_profile() {
+ std::unique_lock<std::mutex> lock(_report_profile_mutex);
+ _report_profile_cv.notify_one();
+}
+
+void RuntimeQueryStatiticsMgr::stop_report_thread() {
+ if (!started) {
+ return;
+ }
+
+ {
+ std::unique_lock<std::mutex> lock(_report_profile_mutex);
+ _report_profile_thread_stop = true;
+ LOG_INFO("All report threads are going to stop");
+ _report_profile_cv.notify_all();
+ }
+
+ for (const auto& thread : _report_profile_threads) {
+ thread->join();
+ }
+
+ LOG_INFO("All report threads stopped");
+}
+
+void RuntimeQueryStatiticsMgr::register_instance_profile(
Review Comment:
warning: method 'register_instance_profile' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static void RuntimeQueryStatiticsMgr::register_instance_profile(
```
##########
be/src/service/backend_service.cpp:
##########
@@ -1129,4 +1130,29 @@ void
BaseBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
response.__set_status(Status::NotSupported("warm_up_tablets is not
implemented").to_thrift());
}
+void
BaseBackendService::get_realtime_exec_status(TGetRealtimeExecStatusResponse&
response,
Review Comment:
warning: method 'get_realtime_exec_status' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static void
BaseBackendService::get_realtime_exec_status(TGetRealtimeExecStatusResponse&
response,
```
##########
be/src/runtime/runtime_query_statistics_mgr.cpp:
##########
@@ -17,14 +17,432 @@
#include "runtime/runtime_query_statistics_mgr.h"
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
+#include <gen_cpp/Status_types.h>
+#include <gen_cpp/Types_types.h>
+#include <thrift/TApplicationException.h>
+
+#include <condition_variable>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <random>
+#include <shared_mutex>
+#include <string>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+
+#include "common/logging.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
+#include "runtime/query_context.h"
+#include "service/backend_options.h"
#include "util/debug_util.h"
+#include "util/hash_util.hpp"
#include "util/time.h"
+#include "util/uid_util.h"
#include "vec/core/block.h"
namespace doris {
+static Status _do_report_exec_stats_rpc(const TNetworkAddress& coor_addr,
+ const TReportExecStatusParams& req,
+ TReportExecStatusResult& res) {
+ Status client_status;
+ FrontendServiceConnection
rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), coor_addr,
+ &client_status);
+ if (!client_status.ok()) {
+ LOG_WARNING(
+ "could not get client rpc client of {} when reporting
profiles, reason is {}, "
+ "not reporting, profile will be lost",
+ PrintThriftNetworkAddress(coor_addr),
client_status.to_string());
+ return Status::RpcError("Client rpc client failed");
+ }
+
+ try {
+ try {
+ rpc_client->reportExecStatus(res, req);
+ } catch (const apache::thrift::transport::TTransportException& e) {
+ LOG_WARNING("Transport exception from {}, reason: {}, reopening",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
+ if (!client_status.ok()) {
+ LOG_WARNING("Reopen failed, reason: {}",
client_status.to_string());
+ return Status::RpcError("Open rpc client failed");
+ }
+
+ rpc_client->reportExecStatus(res, req);
+ }
+ } catch (apache::thrift::TApplicationException& e) {
+ if (e.getType() == e.UNKNOWN_METHOD) {
+ LOG_WARNING(
+ "Failed to send statistics to {} due to {}, usually
because the frontend "
+ "is not upgraded, check the version",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ } else {
+ LOG_WARNING(
+ "Failed to send statistics to {}, reason: {}, you can see
fe log for "
+ "details.",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ }
+ return Status::RpcError("Send stats failed");
+ } catch (std::exception& e) {
+ LOG_WARNING("Failed to send statistics to {}, reason: {}, you can see
fe log for details.",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ return Status::RpcError("Send stats failed");
+ }
+
+ return Status::OK();
+}
+
+TReportExecStatusParams
RuntimeQueryStatiticsMgr::create_report_exec_status_params_x(
+ const TUniqueId& query_id,
+ const std::unordered_map<int32,
std::vector<std::shared_ptr<TRuntimeProfileTree>>>&
+ fragment_id_to_profile,
+ const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
load_channel_profiles) {
+ TQueryProfile profile;
+ profile.__set_query_id(query_id);
+
+ std::map<int32_t, std::vector<TDetailedReportParams>>
fragment_id_to_profile_req;
+
+ for (const auto& entry : fragment_id_to_profile) {
+ int32_t fragment_id = entry.first;
+ const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
fragment_profile = entry.second;
+ std::vector<TDetailedReportParams> detailed_params;
+
+ for (auto pipeline_profile : fragment_profile) {
+ if (pipeline_profile == nullptr) {
+ auto msg = fmt::format("Register fragment profile {} {}
failed, profile is null",
+ print_id(query_id), fragment_id);
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+
+ TDetailedReportParams tmp;
+ tmp.__set_profile(*pipeline_profile);
+ // tmp.fragment_instance_id is not needed for pipeline x
+ detailed_params.push_back(tmp);
+ }
+
+ fragment_id_to_profile_req.insert(std::make_pair(fragment_id,
detailed_params));
+ }
+
+ if (fragment_id_to_profile_req.size() == 0) {
+ LOG_WARNING("No fragment profile found for query {}",
print_id(query_id));
+ }
+
+ profile.__set_fragment_id_to_profile(fragment_id_to_profile_req);
+
+ std::vector<TRuntimeProfileTree> load_channel_profiles_req;
+ for (auto load_channel_profile : load_channel_profiles) {
+ if (load_channel_profile == nullptr) {
+ auto msg = fmt::format(
+ "Register fragment profile {} {} failed, load channel
profile is null",
+ print_id(query_id), -1);
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+
+ load_channel_profiles_req.push_back(*load_channel_profile);
+ }
+
+ if (load_channel_profiles_req.size() > 0) {
+ profile.__set_load_channel_profiles(load_channel_profiles_req);
+ }
+
+ TReportExecStatusParams req;
+ req.__set_query_profile(profile);
+ req.__set_query_id(TUniqueId());
+
+ return req;
+}
+
+TReportExecStatusParams
RuntimeQueryStatiticsMgr::create_report_exec_status_params_non_pipeline(
+ const TUniqueId& query_id,
+ const std::unordered_map<TUniqueId,
std::shared_ptr<TRuntimeProfileTree>>&
+ instance_id_to_profile,
+ const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
load_channel_profile) {
+ TQueryProfile profile;
+ std::vector<TUniqueId> fragment_instance_ids;
+ std::vector<TRuntimeProfileTree> instance_profiles;
+ std::vector<TRuntimeProfileTree> load_channel_profiles;
+
+ for (auto entry : instance_id_to_profile) {
+ TUniqueId instance_id = entry.first;
+ std::shared_ptr<TRuntimeProfileTree> profile = entry.second;
+
+ if (profile == nullptr) {
+ auto msg = fmt::format("Register instance profile {} {} failed,
profile is null",
+ print_id(query_id), print_id(instance_id));
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+
+ fragment_instance_ids.push_back(instance_id);
+ instance_profiles.push_back(*profile);
+ }
+
+ profile.__set_query_id(query_id);
+ profile.__set_fragment_instance_ids(fragment_instance_ids);
+ profile.__set_instance_profiles(instance_profiles);
+ profile.__set_load_channel_profiles(load_channel_profiles);
+
+ TReportExecStatusParams res;
+ res.__set_query_profile(profile);
+ res.__set_query_id(TUniqueId());
+ return res;
+}
+
+void RuntimeQueryStatiticsMgr::start_report_thread() {
Review Comment:
warning: method 'start_report_thread' can be made static
[readability-convert-member-functions-to-static]
be/src/runtime/runtime_query_statistics_mgr.h:99:
```diff
- void start_report_thread();
+ static void start_report_thread();
```
##########
be/src/runtime/runtime_query_statistics_mgr.cpp:
##########
@@ -17,14 +17,432 @@
#include "runtime/runtime_query_statistics_mgr.h"
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
+#include <gen_cpp/Status_types.h>
+#include <gen_cpp/Types_types.h>
+#include <thrift/TApplicationException.h>
+
+#include <condition_variable>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <random>
+#include <shared_mutex>
+#include <string>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+
+#include "common/logging.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
+#include "runtime/query_context.h"
+#include "service/backend_options.h"
#include "util/debug_util.h"
+#include "util/hash_util.hpp"
#include "util/time.h"
+#include "util/uid_util.h"
#include "vec/core/block.h"
namespace doris {
+static Status _do_report_exec_stats_rpc(const TNetworkAddress& coor_addr,
+ const TReportExecStatusParams& req,
+ TReportExecStatusResult& res) {
+ Status client_status;
+ FrontendServiceConnection
rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), coor_addr,
+ &client_status);
+ if (!client_status.ok()) {
+ LOG_WARNING(
+ "could not get client rpc client of {} when reporting
profiles, reason is {}, "
+ "not reporting, profile will be lost",
+ PrintThriftNetworkAddress(coor_addr),
client_status.to_string());
+ return Status::RpcError("Client rpc client failed");
+ }
+
+ try {
+ try {
+ rpc_client->reportExecStatus(res, req);
+ } catch (const apache::thrift::transport::TTransportException& e) {
+ LOG_WARNING("Transport exception from {}, reason: {}, reopening",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
+ if (!client_status.ok()) {
+ LOG_WARNING("Reopen failed, reason: {}",
client_status.to_string());
+ return Status::RpcError("Open rpc client failed");
+ }
+
+ rpc_client->reportExecStatus(res, req);
+ }
+ } catch (apache::thrift::TApplicationException& e) {
+ if (e.getType() == e.UNKNOWN_METHOD) {
+ LOG_WARNING(
+ "Failed to send statistics to {} due to {}, usually
because the frontend "
+ "is not upgraded, check the version",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ } else {
+ LOG_WARNING(
+ "Failed to send statistics to {}, reason: {}, you can see
fe log for "
+ "details.",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ }
+ return Status::RpcError("Send stats failed");
+ } catch (std::exception& e) {
+ LOG_WARNING("Failed to send statistics to {}, reason: {}, you can see
fe log for details.",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ return Status::RpcError("Send stats failed");
+ }
+
+ return Status::OK();
+}
+
+TReportExecStatusParams
RuntimeQueryStatiticsMgr::create_report_exec_status_params_x(
+ const TUniqueId& query_id,
+ const std::unordered_map<int32,
std::vector<std::shared_ptr<TRuntimeProfileTree>>>&
+ fragment_id_to_profile,
+ const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
load_channel_profiles) {
+ TQueryProfile profile;
+ profile.__set_query_id(query_id);
+
+ std::map<int32_t, std::vector<TDetailedReportParams>>
fragment_id_to_profile_req;
+
+ for (const auto& entry : fragment_id_to_profile) {
+ int32_t fragment_id = entry.first;
+ const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
fragment_profile = entry.second;
+ std::vector<TDetailedReportParams> detailed_params;
+
+ for (auto pipeline_profile : fragment_profile) {
+ if (pipeline_profile == nullptr) {
+ auto msg = fmt::format("Register fragment profile {} {}
failed, profile is null",
+ print_id(query_id), fragment_id);
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+
+ TDetailedReportParams tmp;
+ tmp.__set_profile(*pipeline_profile);
+ // tmp.fragment_instance_id is not needed for pipeline x
+ detailed_params.push_back(tmp);
+ }
+
+ fragment_id_to_profile_req.insert(std::make_pair(fragment_id,
detailed_params));
+ }
+
+ if (fragment_id_to_profile_req.size() == 0) {
+ LOG_WARNING("No fragment profile found for query {}",
print_id(query_id));
+ }
+
+ profile.__set_fragment_id_to_profile(fragment_id_to_profile_req);
+
+ std::vector<TRuntimeProfileTree> load_channel_profiles_req;
+ for (auto load_channel_profile : load_channel_profiles) {
+ if (load_channel_profile == nullptr) {
+ auto msg = fmt::format(
+ "Register fragment profile {} {} failed, load channel
profile is null",
+ print_id(query_id), -1);
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+
+ load_channel_profiles_req.push_back(*load_channel_profile);
+ }
+
+ if (load_channel_profiles_req.size() > 0) {
+ profile.__set_load_channel_profiles(load_channel_profiles_req);
+ }
+
+ TReportExecStatusParams req;
+ req.__set_query_profile(profile);
+ req.__set_query_id(TUniqueId());
+
+ return req;
+}
+
+TReportExecStatusParams
RuntimeQueryStatiticsMgr::create_report_exec_status_params_non_pipeline(
+ const TUniqueId& query_id,
+ const std::unordered_map<TUniqueId,
std::shared_ptr<TRuntimeProfileTree>>&
+ instance_id_to_profile,
+ const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
load_channel_profile) {
+ TQueryProfile profile;
+ std::vector<TUniqueId> fragment_instance_ids;
+ std::vector<TRuntimeProfileTree> instance_profiles;
+ std::vector<TRuntimeProfileTree> load_channel_profiles;
+
+ for (auto entry : instance_id_to_profile) {
+ TUniqueId instance_id = entry.first;
+ std::shared_ptr<TRuntimeProfileTree> profile = entry.second;
+
+ if (profile == nullptr) {
+ auto msg = fmt::format("Register instance profile {} {} failed,
profile is null",
+ print_id(query_id), print_id(instance_id));
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+
+ fragment_instance_ids.push_back(instance_id);
+ instance_profiles.push_back(*profile);
+ }
+
+ profile.__set_query_id(query_id);
+ profile.__set_fragment_instance_ids(fragment_instance_ids);
+ profile.__set_instance_profiles(instance_profiles);
+ profile.__set_load_channel_profiles(load_channel_profiles);
+
+ TReportExecStatusParams res;
+ res.__set_query_profile(profile);
+ res.__set_query_id(TUniqueId());
+ return res;
+}
+
+void RuntimeQueryStatiticsMgr::start_report_thread() {
+ if (started.load()) {
+ DCHECK(false) << "report thread has been started";
+ LOG_ERROR("report thread has been started");
+ return;
+ }
+
+ started.store(true);
+
+ for (size_t i = 0; i < config::report_exec_status_thread_num; ++i) {
+
this->_report_profile_threads.emplace_back(std::make_unique<std::thread>(
+ &RuntimeQueryStatiticsMgr::report_query_profiles_thread,
this));
+ }
+}
+
+void RuntimeQueryStatiticsMgr::report_query_profiles_thread() {
+ while (true) {
+ {
+ std::unique_lock<std::mutex> lock(_report_profile_mutex);
+
+ while (_query_profile_map.empty() && _profile_map_x.empty() &&
+ !_report_profile_thread_stop) {
+ _report_profile_cv.wait(lock);
+ }
+ }
+
+ _report_query_profiles_function();
+
+ {
+ std::lock_guard<std::mutex> lg(_report_profile_mutex);
+
+ if (_report_profile_thread_stop) {
+ LOG_INFO("Report profile thread stopped");
+ return;
+ }
+ }
+ }
+}
+
+void RuntimeQueryStatiticsMgr::trigger_report_profile() {
+ std::unique_lock<std::mutex> lock(_report_profile_mutex);
+ _report_profile_cv.notify_one();
+}
+
+void RuntimeQueryStatiticsMgr::stop_report_thread() {
+ if (!started) {
+ return;
+ }
+
+ {
+ std::unique_lock<std::mutex> lock(_report_profile_mutex);
+ _report_profile_thread_stop = true;
+ LOG_INFO("All report threads are going to stop");
+ _report_profile_cv.notify_all();
+ }
+
+ for (const auto& thread : _report_profile_threads) {
+ thread->join();
+ }
+
+ LOG_INFO("All report threads stopped");
+}
+
+void RuntimeQueryStatiticsMgr::register_instance_profile(
+ const TUniqueId& query_id, const TNetworkAddress& coor_addr, const
TUniqueId& instance_id,
+ std::shared_ptr<TRuntimeProfileTree> instance_profile,
+ std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
+ if (instance_profile == nullptr) {
+ auto msg = fmt::format("Register instance profile {} {} failed,
profile is null",
+ print_id(query_id), print_id(instance_id));
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ return;
+ }
+
+ std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
+
+ if (!_query_profile_map.contains(query_id)) {
+ _query_profile_map[query_id] = std::make_tuple(
+ coor_addr, std::unordered_map<TUniqueId,
std::shared_ptr<TRuntimeProfileTree>>());
+ }
+
+ std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>&
instance_profile_map =
+ std::get<1>(_query_profile_map[query_id]);
+ instance_profile_map.insert(std::make_pair(instance_id, instance_profile));
+
+ if (load_channel_profile != nullptr) {
+ _load_channel_profile_map[instance_id] = load_channel_profile;
+ }
+
+ LOG_INFO("Register instance profile {} {}", print_id(query_id),
print_id(instance_id));
+}
+
+void RuntimeQueryStatiticsMgr::register_fragment_profile_x(
+ const TUniqueId& query_id, const TNetworkAddress& coor_addr, int32_t
fragment_id,
+ std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles,
+ std::shared_ptr<TRuntimeProfileTree> load_channel_profile_x) {
+ for (const auto& p : p_profiles) {
+ if (p == nullptr) {
+ auto msg = fmt::format("Register fragment profile {} {} failed,
profile is null",
+ print_id(query_id), fragment_id);
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ return;
+ }
+ }
+
+ std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
+
+ if (!_profile_map_x.contains(query_id)) {
+ _profile_map_x[query_id] = std::make_tuple(
+ coor_addr,
+ std::unordered_map<int,
std::vector<std::shared_ptr<TRuntimeProfileTree>>>());
+ }
+
+ std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>&
+ fragment_profile_map = std::get<1>(_profile_map_x[query_id]);
+ fragment_profile_map.insert(std::make_pair(fragment_id, p_profiles));
+
+ if (load_channel_profile_x != nullptr) {
+ _load_channel_profile_map_x[std::make_pair(query_id, fragment_id)] =
load_channel_profile_x;
+ }
+
+ LOG_INFO("register x profile done {}, fragment {}, profiles {}",
print_id(query_id),
+ fragment_id, p_profiles.size());
+}
+
+void RuntimeQueryStatiticsMgr::_report_query_profiles_non_pipeline() {
+ // query_id -> {coordinator_addr, {instance_id -> instance_profile}}
+ decltype(_query_profile_map) profile_copy;
+ decltype(_load_channel_profile_map) load_channel_profile_copy;
+
+ {
+ std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
+ profile_copy = _query_profile_map;
+ load_channel_profile_copy = _load_channel_profile_map;
+ _query_profile_map.clear();
+ _load_channel_profile_map.clear();
+ }
+
+ // query_id -> {coordinator_addr, {instance_id -> instance_profile}}
+ for (const auto& entry : profile_copy) {
+ const auto& query_id = entry.first;
+ const auto& coor_addr = std::get<0>(entry.second);
+ const std::unordered_map<TUniqueId,
std::shared_ptr<TRuntimeProfileTree>>&
+ instance_id_to_profile = std::get<1>(entry.second);
+
+ for (const auto& profile_entry : instance_id_to_profile) {
+ const auto& instance_id = profile_entry.first;
+ const auto& instance_profile = profile_entry.second;
+
+ if (instance_profile == nullptr) {
+ auto msg = fmt::format("Query {} instance {} profile is null",
print_id(query_id),
+ print_id(instance_id));
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+ }
+
+ std::vector<std::shared_ptr<TRuntimeProfileTree>>
load_channel_profiles;
+ for (const auto& load_channel_profile : load_channel_profile_copy) {
+ if (load_channel_profile.second == nullptr) {
+ auto msg = fmt::format(
+ "Register fragment profile {} {} failed, load channel
profile is null",
+ print_id(query_id), -1);
+ DCHECK(false) << msg;
+ LOG_ERROR(msg);
+ continue;
+ }
+
+ load_channel_profiles.push_back(load_channel_profile.second);
+ }
+
+ TReportExecStatusParams req =
create_report_exec_status_params_non_pipeline(
+ query_id, instance_id_to_profile, load_channel_profiles);
+ TReportExecStatusResult res;
+ auto rpc_status = _do_report_exec_stats_rpc(coor_addr, req, res);
+
+ if (res.status.status_code != TStatusCode::OK) {
+ std::stringstream ss;
+ res.status.printTo(ss);
+ LOG_WARNING("Query {} send profile to {} failed, msg: {}",
print_id(query_id),
+ PrintThriftNetworkAddress(coor_addr), ss.str());
+ } else {
+ LOG_INFO("Send {} profile finished", print_id(query_id));
+ }
+ }
+}
+
+void RuntimeQueryStatiticsMgr::_report_query_profiles_x() {
Review Comment:
warning: method '_report_query_profiles_x' can be made static
[readability-convert-member-functions-to-static]
be/src/runtime/runtime_query_statistics_mgr.h:129:
```diff
- void _report_query_profiles_x();
+ static void _report_query_profiles_x();
```
##########
be/src/service/backend_service.cpp:
##########
@@ -1129,4 +1130,29 @@
response.__set_status(Status::NotSupported("warm_up_tablets is not
implemented").to_thrift());
}
+void
BaseBackendService::get_realtime_exec_status(TGetRealtimeExecStatusResponse&
response,
+ const
TGetRealtimeExecStatusRequest& request) {
+ if (!request.__isset.id) {
+ LOG_WARNING("Invalidate argument, id is empty");
+ response.__set_status(Status::InvalidArgument("id is
empty").to_thrift());
+ }
+
+ LOG_INFO("Getting realtime exec status of query {}", print_id(request.id));
+ std::unique_ptr<TReportExecStatusParams> report_exec_status_params =
+ std::make_unique<TReportExecStatusParams>();
+ Status st =
ExecEnv::GetInstance()->fragment_mgr()->get_realtime_exec_status(
+ request.id, report_exec_status_params.get());
+
+ if (!st.ok()) {
+ response.__set_status(st.to_thrift());
+ return;
+ }
+
+ report_exec_status_params->__set_query_id(TUniqueId());
+
+ response.__set_status(Status::OK().to_thrift());
+ response.__set_report_exec_status_params(*report_exec_status_params);
+ return;
Review Comment:
warning: redundant return statement at the end of a function with a void
return type [readability-redundant-control-flow]
be/src/service/backend_service.cpp:1153:
```diff
- response.__set_report_exec_status_params(*report_exec_status_params);
- return;
- }
+ }
```
##########
be/src/runtime/runtime_query_statistics_mgr.h:
##########
@@ -18,12 +18,22 @@
#pragma once
#include <gen_cpp/Data_types.h>
Review Comment:
warning: 'gen_cpp/Data_types.h' file not found [clang-diagnostic-error]
```cpp
#include <gen_cpp/Data_types.h>
^
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]