This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 16e25f6335b [opt](profile) Avoid unnecessary copies in the profile
thrift (#34720)
16e25f6335b is described below
commit 16e25f6335b19d90ff83bbaa7a5238a1c55b3402
Author: Mryange <[email protected]>
AuthorDate: Wed May 22 14:23:59 2024 +0800
[opt](profile) Avoid unnecessary copies in the profile thrift (#34720)
---
be/src/runtime/fragment_mgr.cpp | 2 +-
be/src/runtime/query_context.cpp | 3 ++-
be/src/runtime/runtime_query_statistics_mgr.cpp | 33 +++++++++++++------------
be/src/runtime/runtime_query_statistics_mgr.h | 5 ++--
be/src/service/backend_service.cpp | 13 ++++++++--
be/src/util/runtime_profile.cpp | 9 +++----
be/src/util/runtime_profile.h | 2 +-
be/src/util/thrift_client.h | 5 ++++
gensrc/thrift/Makefile | 2 +-
9 files changed, 43 insertions(+), 31 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 40013fb33dd..97a8502cfcc 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -281,7 +281,7 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
detailed_param.__isset.profile = true;
detailed_param.__isset.loadChannelProfile = false;
pipeline_profile->to_thrift(&detailed_param.profile);
- params.detailed_report.push_back(detailed_param);
+
params.detailed_report.push_back(std::move(detailed_param));
}
}
} else {
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 5360fbe4e4b..4f63f85e231 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -470,7 +470,8 @@ TReportExecStatusParams
QueryContext::get_realtime_exec_status_x() const {
}
exec_status =
RuntimeQueryStatiticsMgr::create_report_exec_status_params_x(
- this->_query_id, realtime_query_profile,
load_channel_profiles, /*is_done=*/false);
+ this->_query_id, std::move(realtime_query_profile),
+ std::move(load_channel_profiles), /*is_done=*/false);
} else {
auto msg = fmt::format("Query {} is not pipelineX query",
print_id(_query_id));
LOG_ERROR(msg);
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 46e73940934..55051eff686 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -41,6 +41,7 @@
#include "service/backend_options.h"
#include "util/debug_util.h"
#include "util/hash_util.hpp"
+#include "util/thrift_client.h"
#include "util/time.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
@@ -102,10 +103,10 @@ static Status _do_report_exec_stats_rpc(const
TNetworkAddress& coor_addr,
TReportExecStatusParams
RuntimeQueryStatiticsMgr::create_report_exec_status_params_x(
const TUniqueId& query_id,
- const std::unordered_map<int32,
std::vector<std::shared_ptr<TRuntimeProfileTree>>>&
+ std::unordered_map<int32,
std::vector<std::shared_ptr<TRuntimeProfileTree>>>
fragment_id_to_profile,
- const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
load_channel_profiles,
- bool is_done) {
+ std::vector<std::shared_ptr<TRuntimeProfileTree>>
load_channel_profiles, bool is_done) {
+ // This function will clear the data of fragment_id_to_profile and
load_channel_profiles.
TQueryProfile profile;
profile.__set_query_id(query_id);
@@ -126,15 +127,15 @@ TReportExecStatusParams
RuntimeQueryStatiticsMgr::create_report_exec_status_para
}
TDetailedReportParams tmp;
- tmp.__set_profile(*pipeline_profile);
+ THRIFT_MOVE_VALUES(tmp, profile, *pipeline_profile);
// tmp.fragment_instance_id is not needed for pipeline x
- detailed_params.push_back(tmp);
+ detailed_params.push_back(std::move(tmp));
}
- fragment_id_to_profile_req.insert(std::make_pair(fragment_id,
detailed_params));
+ fragment_id_to_profile_req[fragment_id] = std::move(detailed_params);
}
- if (fragment_id_to_profile_req.size() == 0) {
+ if (fragment_id_to_profile_req.empty()) {
LOG_WARNING("No fragment profile found for query {}",
print_id(query_id));
}
@@ -151,15 +152,15 @@ TReportExecStatusParams
RuntimeQueryStatiticsMgr::create_report_exec_status_para
continue;
}
- load_channel_profiles_req.push_back(*load_channel_profile);
+ load_channel_profiles_req.push_back(std::move(*load_channel_profile));
}
- if (load_channel_profiles_req.size() > 0) {
- profile.__set_load_channel_profiles(load_channel_profiles_req);
+ if (!load_channel_profiles_req.empty()) {
+ THRIFT_MOVE_VALUES(profile, load_channel_profiles,
load_channel_profiles_req);
}
TReportExecStatusParams req;
- req.__set_query_profile(profile);
+ THRIFT_MOVE_VALUES(req, query_profile, profile);
req.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id);
// invalid query id to avoid API compatibility during upgrade
req.__set_query_id(TUniqueId());
@@ -408,10 +409,10 @@ void RuntimeQueryStatiticsMgr::_report_query_profiles_x()
{
}
// query_id -> {coordinator_addr, {fragment_id ->
std::vectpr<pipeline_profile>}}
- for (const auto& entry : profile_copy) {
+ for (auto& entry : profile_copy) {
const auto& query_id = entry.first;
const auto& coor_addr = std::get<0>(entry.second);
- const auto& fragment_profile_map = std::get<1>(entry.second);
+ auto& fragment_profile_map = std::get<1>(entry.second);
if (fragment_profile_map.empty()) {
auto msg = fmt::format("Query {} does not have profile",
print_id(query_id));
@@ -435,13 +436,13 @@ void RuntimeQueryStatiticsMgr::_report_query_profiles_x()
{
}
TReportExecStatusParams req = create_report_exec_status_params_x(
- query_id, fragment_profile_map, load_channel_profiles,
/*is_done=*/true);
+ query_id, std::move(fragment_profile_map),
std::move(load_channel_profiles),
+ /*is_done=*/true);
TReportExecStatusResult res;
auto rpc_status = _do_report_exec_stats_rpc(coor_addr, req, res);
- if (res.status.status_code != TStatusCode::OK ||
- res.status.status_code != TStatusCode::OK) {
+ if (res.status.status_code != TStatusCode::OK || !rpc_status.ok()) {
LOG_WARNING("Query {} send profile to {} failed",
print_id(query_id),
PrintThriftNetworkAddress(coor_addr));
} else {
diff --git a/be/src/runtime/runtime_query_statistics_mgr.h
b/be/src/runtime/runtime_query_statistics_mgr.h
index d7e473019d8..ff61f665342 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.h
+++ b/be/src/runtime/runtime_query_statistics_mgr.h
@@ -71,10 +71,9 @@ public:
static TReportExecStatusParams create_report_exec_status_params_x(
const TUniqueId& q_id,
- const std::unordered_map<int32,
std::vector<std::shared_ptr<TRuntimeProfileTree>>>&
+ std::unordered_map<int32,
std::vector<std::shared_ptr<TRuntimeProfileTree>>>
fragment_id_to_profile,
- const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
load_channel_profile,
- bool is_done);
+ std::vector<std::shared_ptr<TRuntimeProfileTree>>
load_channel_profile, bool is_done);
static TReportExecStatusParams
create_report_exec_status_params_non_pipeline(
const TUniqueId& q_id,
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index 296e8b08a5e..de1e1cd9b25 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -24,6 +24,7 @@
#include <gen_cpp/Data_types.h>
#include <gen_cpp/DorisExternalService_types.h>
#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/Planner_types.h>
#include <gen_cpp/Status_types.h>
@@ -69,6 +70,7 @@
#include "runtime/stream_load/stream_load_recorder.h"
#include "util/arrow/row_batch.h"
#include "util/defer_op.h"
+#include "util/runtime_profile.h"
#include "util/threadpool.h"
#include "util/thrift_server.h"
#include "util/uid_util.h"
@@ -1171,7 +1173,15 @@ void
BaseBackendService::get_realtime_exec_status(TGetRealtimeExecStatusResponse
return;
}
- LOG_INFO("Getting realtime exec status of query {}", print_id(request.id));
+ RuntimeProfile::Counter get_realtime_timer {TUnit::TIME_NS};
+
+ Defer _print_log([&]() {
+ LOG_INFO("Getting realtime exec status of query {} , cost time {}",
print_id(request.id),
+ PrettyPrinter::print(get_realtime_timer.value(),
get_realtime_timer.type()));
+ });
+
+ SCOPED_TIMER(&get_realtime_timer);
+
std::unique_ptr<TReportExecStatusParams> report_exec_status_params =
std::make_unique<TReportExecStatusParams>();
Status st =
ExecEnv::GetInstance()->fragment_mgr()->get_realtime_exec_status(
@@ -1187,7 +1197,6 @@ void
BaseBackendService::get_realtime_exec_status(TGetRealtimeExecStatusResponse
response.__set_status(Status::OK().to_thrift());
response.__set_report_exec_status_params(*report_exec_status_params);
- return;
}
} // namespace doris
diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp
index 7c9d40ba3ed..a9e197fba9b 100644
--- a/be/src/util/runtime_profile.cpp
+++ b/be/src/util/runtime_profile.cpp
@@ -589,15 +589,12 @@ void
RuntimeProfile::to_thrift(std::vector<TRuntimeProfileNode>* nodes) {
if (this->is_set_sink()) {
node.__set_is_sink(this->is_sink());
}
- CounterMap counter_map;
{
std::lock_guard<std::mutex> l(_counter_map_lock);
- counter_map = _counter_map;
node.child_counters_map = _child_counter_map;
- }
-
- for (auto&& [name, counter] : counter_map) {
- counter->to_thrift(name, node.counters, node.child_counters_map);
+ for (auto&& [name, counter] : _counter_map) {
+ counter->to_thrift(name, node.counters, node.child_counters_map);
+ }
}
{
diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h
index c28329fe5da..b77157d1f5b 100644
--- a/be/src/util/runtime_profile.h
+++ b/be/src/util/runtime_profile.h
@@ -123,7 +123,7 @@ public:
counter.value = this->value();
counter.type = this->type();
counter.__set_level(this->level());
- tcounters.push_back(counter);
+ tcounters.push_back(std::move(counter));
}
TUnit::type type() const { return _type; }
diff --git a/be/src/util/thrift_client.h b/be/src/util/thrift_client.h
index 2328f298450..e60bc32af72 100644
--- a/be/src/util/thrift_client.h
+++ b/be/src/util/thrift_client.h
@@ -38,6 +38,11 @@ class TTransport;
} // namespace apache
namespace doris {
+
+#define THRIFT_MOVE_VALUES(thrift, member, value) \
+ thrift.__isset.member = true; \
+ thrift.member = std::move(value);
+
// Super class for templatized thrift clients.
class ThriftClientImpl {
public:
diff --git a/gensrc/thrift/Makefile b/gensrc/thrift/Makefile
index e2d81952d54..bc30124bd81 100644
--- a/gensrc/thrift/Makefile
+++ b/gensrc/thrift/Makefile
@@ -31,7 +31,7 @@ all: ${GEN_OBJECTS} ${OBJECTS}
$(shell mkdir -p ${BUILD_DIR}/gen_java)
-THRIFT_CPP_ARGS = -I ${CURDIR} -I ${BUILD_DIR}/thrift/ --gen cpp -out
${BUILD_DIR}/gen_cpp --allow-64bit-consts -strict
+THRIFT_CPP_ARGS = -I ${CURDIR} -I ${BUILD_DIR}/thrift/ --gen
cpp:moveable_types -out ${BUILD_DIR}/gen_cpp --allow-64bit-consts -strict
THRIFT_JAVA_ARGS = -I ${CURDIR} -I ${BUILD_DIR}/thrift/ --gen java:fullcamel
-out ${BUILD_DIR}/gen_java --allow-64bit-consts -strict
${BUILD_DIR}/gen_cpp:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]