github-actions[bot] commented on code in PR #33015:
URL: https://github.com/apache/doris/pull/33015#discussion_r1547791651
##########
be/src/runtime/runtime_query_statistics_mgr.cpp:
##########
@@ -17,14 +17,307 @@
#include "runtime/runtime_query_statistics_mgr.h"
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
+#include <gen_cpp/Status_types.h>
+#include <gen_cpp/Types_types.h>
+#include <thrift/TApplicationException.h>
+
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <random>
+#include <shared_mutex>
+#include <string>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+
+#include "common/logging.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
+#include "runtime/query_context.h"
+#include "service/backend_options.h"
#include "util/debug_util.h"
#include "util/time.h"
+#include "util/uid_util.h"
#include "vec/core/block.h"
namespace doris {
+static Status _doReportExecStatsRpc(const TNetworkAddress& coor_addr,
+ const TReportExecStatusParams& req,
+ TReportExecStatusResult& res) {
+ Status client_status;
+ FrontendServiceConnection
rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), coor_addr,
+ &client_status);
+ if (!client_status.ok()) {
+ LOG_WARNING(
+ "could not get client rpc client of {} when reporting
profiles, reason is {}, "
+ "not reporting, profile will be lost",
+ PrintThriftNetworkAddress(coor_addr),
client_status.to_string());
+ return Status::RpcError("Client rpc client failed");
+ }
+
+ try {
+ try {
+ rpc_client->reportExecStatus(res, req);
+ } catch (const apache::thrift::transport::TTransportException& e) {
+ LOG_WARNING("Transport exception from {}, reason: {}, reopening",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
+ if (!client_status.ok()) {
+ LOG_WARNING("Reopen failed, reason: {}",
client_status.to_string());
+ return Status::RpcError("Open rpc client failed");
+ }
+
+ rpc_client->reportExecStatus(res, req);
+ }
+ } catch (apache::thrift::TApplicationException& e) {
+ if (e.getType() == e.UNKNOWN_METHOD) {
+ LOG_WARNING(
+ "Failed to send statistics to {} due to {}, usually
because the frontend "
+ "is not upgraded, check the version",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ } else {
+ LOG_WARNING(
+ "Failed to send statistics to {}, reason: {}, you can see
fe log for "
+ "details.",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ }
+ return Status::RpcError("Send stats failed");
+ } catch (std::exception& e) {
+ LOG_WARNING("Failed to send statistics to {}, reason: {}, you can see
fe log for details.",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ return Status::RpcError("Send stats failed");
+ }
+
+ return Status::OK();
+}
+
+TReportExecStatusParams
RuntimeQueryStatiticsMgr::create_report_exec_status_params(
+ const TUniqueId& query_id, int32 fragment_id,
+ const std::vector<std::shared_ptr<TRuntimeProfileTree>>& f_profile) {
+ TReportExecStatusParams req;
+ req.__set_query_id(query_id);
+ req.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id);
+ req.__set_fragment_id(fragment_id);
+ std::vector<TDetailedReportParams> detailed_params;
+ for (const auto& p_profile : f_profile) {
+ TDetailedReportParams tmp;
+ tmp.__set_profile(*p_profile);
+ // tmp.fragment_instance_id is not needed for pipeline x
+ detailed_params.push_back(tmp);
+ }
+
+ req.__set_detailed_report(detailed_params);
+ return req;
+}
+
+void RuntimeQueryStatiticsMgr::start_report_thread() {
+ if (started.load()) {
+ DCHECK(false) << "report thread has been started";
+ LOG_ERROR("report thread has been started");
+ return;
+ }
+
+ started.store(true);
+
+ for (size_t i = 0; i < config::report_exec_status_thread_num; ++i) {
+
this->_report_profile_threads.emplace_back(std::make_unique<std::thread>(
+ &RuntimeQueryStatiticsMgr::report_query_profiles_thread,
this));
+ }
+}
+
+void RuntimeQueryStatiticsMgr::report_query_profiles_thread() {
+ while (true) {
+ {
+ std::unique_lock<std::mutex> lock(_report_profile_mutex);
+
+ while (_profile_map.empty() && _profile_map_x.empty() &&
!_report_profile_thread_stop) {
+ _report_profile_cv.wait(lock);
+ }
+
+ if (_report_profile_thread_stop) {
+ LOG_INFO("Report profile thread stopped");
+ return;
+ }
+ }
+
+ _report_query_profiles_function();
+ }
+}
+
+void RuntimeQueryStatiticsMgr::force_report_profile() {
Review Comment:
warning: method 'force_report_profile' can be made static
[readability-convert-member-functions-to-static]
be/src/runtime/runtime_query_statistics_mgr.h:90:
```diff
- void force_report_profile();
+ static void force_report_profile();
```
##########
be/src/runtime/runtime_query_statistics_mgr.cpp:
##########
@@ -17,14 +17,307 @@
#include "runtime/runtime_query_statistics_mgr.h"
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
+#include <gen_cpp/Status_types.h>
+#include <gen_cpp/Types_types.h>
+#include <thrift/TApplicationException.h>
+
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <random>
+#include <shared_mutex>
+#include <string>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+
+#include "common/logging.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
+#include "runtime/query_context.h"
+#include "service/backend_options.h"
#include "util/debug_util.h"
#include "util/time.h"
+#include "util/uid_util.h"
#include "vec/core/block.h"
namespace doris {
+static Status _doReportExecStatsRpc(const TNetworkAddress& coor_addr,
+ const TReportExecStatusParams& req,
+ TReportExecStatusResult& res) {
+ Status client_status;
+ FrontendServiceConnection
rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), coor_addr,
+ &client_status);
+ if (!client_status.ok()) {
+ LOG_WARNING(
+ "could not get client rpc client of {} when reporting
profiles, reason is {}, "
+ "not reporting, profile will be lost",
+ PrintThriftNetworkAddress(coor_addr),
client_status.to_string());
+ return Status::RpcError("Client rpc client failed");
+ }
+
+ try {
+ try {
+ rpc_client->reportExecStatus(res, req);
+ } catch (const apache::thrift::transport::TTransportException& e) {
+ LOG_WARNING("Transport exception from {}, reason: {}, reopening",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
+ if (!client_status.ok()) {
+ LOG_WARNING("Reopen failed, reason: {}",
client_status.to_string());
+ return Status::RpcError("Open rpc client failed");
+ }
+
+ rpc_client->reportExecStatus(res, req);
+ }
+ } catch (apache::thrift::TApplicationException& e) {
+ if (e.getType() == e.UNKNOWN_METHOD) {
+ LOG_WARNING(
+ "Failed to send statistics to {} due to {}, usually
because the frontend "
+ "is not upgraded, check the version",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ } else {
+ LOG_WARNING(
+ "Failed to send statistics to {}, reason: {}, you can see
fe log for "
+ "details.",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ }
+ return Status::RpcError("Send stats failed");
+ } catch (std::exception& e) {
+ LOG_WARNING("Failed to send statistics to {}, reason: {}, you can see
fe log for details.",
+ PrintThriftNetworkAddress(coor_addr), e.what());
+ return Status::RpcError("Send stats failed");
+ }
+
+ return Status::OK();
+}
+
+TReportExecStatusParams
RuntimeQueryStatiticsMgr::create_report_exec_status_params(
+ const TUniqueId& query_id, int32 fragment_id,
+ const std::vector<std::shared_ptr<TRuntimeProfileTree>>& f_profile) {
+ TReportExecStatusParams req;
+ req.__set_query_id(query_id);
+ req.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id);
+ req.__set_fragment_id(fragment_id);
+ std::vector<TDetailedReportParams> detailed_params;
+ for (const auto& p_profile : f_profile) {
+ TDetailedReportParams tmp;
+ tmp.__set_profile(*p_profile);
+ // tmp.fragment_instance_id is not needed for pipeline x
+ detailed_params.push_back(tmp);
+ }
+
+ req.__set_detailed_report(detailed_params);
+ return req;
+}
+
+void RuntimeQueryStatiticsMgr::start_report_thread() {
Review Comment:
warning: method 'start_report_thread' can be made static
[readability-convert-member-functions-to-static]
be/src/runtime/runtime_query_statistics_mgr.h:88:
```diff
- void start_report_thread();
+ static void start_report_thread();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]