This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 75a6f28f2e1 [cherry-pick]Add query type when report (#35918)
75a6f28f2e1 is described below
commit 75a6f28f2e15d940705c77af60e66964ed3301eb
Author: wangbo <[email protected]>
AuthorDate: Tue Jun 11 10:51:59 2024 +0800
[cherry-pick]Add query type when report (#35918)
pick #34978
---
.../schema_scanner/schema_backend_active_tasks.cpp | 1 +
be/src/runtime/fragment_mgr.cpp | 9 ++++----
be/src/runtime/query_context.cpp | 24 ++++++++++++++++------
be/src/runtime/runtime_query_statistics_mgr.cpp | 13 ++++++++++--
be/src/runtime/runtime_query_statistics_mgr.h | 8 +++++---
.../java/org/apache/doris/catalog/SchemaTable.java | 1 +
6 files changed, 41 insertions(+), 15 deletions(-)
diff --git a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
index aa84f0d68c3..f1155796ed4 100644
--- a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
+++ b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
@@ -38,6 +38,7 @@ std::vector<SchemaScanner::ColumnDesc>
SchemaBackendActiveTasksScanner::_s_tbls_
{"CURRENT_USED_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"SHUFFLE_SEND_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"SHUFFLE_SEND_ROWS", TYPE_BIGINT, sizeof(int64_t), false},
+ {"QUERY_TYPE", TYPE_VARCHAR, sizeof(StringRef), false},
};
SchemaBackendActiveTasksScanner::SchemaBackendActiveTasksScanner()
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 6646b8cdf27..4ca84f94041 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -640,6 +640,11 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
return Status::OK();
}
+ LOG(INFO) << "query_id: " << print_id(query_id) << ", coord_addr: " <<
params.coord
+ << ", total fragment num on current host: " <<
params.fragment_num_on_host
+ << ", fe process uuid: " <<
params.query_options.fe_process_uuid
+ << ", query type: " << params.query_options.query_type;
+
// This may be a first fragment request of the query.
// Create the query fragments context.
query_ctx = QueryContext::create_shared(query_id,
params.fragment_num_on_host, _exec_env,
@@ -653,10 +658,6 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
query_ctx->file_scan_range_params_map = params.file_scan_params;
}
- LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id().hi,
query_ctx->query_id().lo)
- << " coord_addr " << query_ctx->coord_addr
- << " total fragment num on current host: " <<
params.fragment_num_on_host
- << " fe process uuid: " <<
params.query_options.fe_process_uuid;
query_ctx->query_globals = params.query_globals;
if (params.__isset.resource_info) {
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 4b4b6b006f1..f9cc9757fe3 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -53,7 +53,7 @@ QueryContext::QueryContext(TUniqueId query_id, int
total_fragment_num, ExecEnv*
_query_options(query_options) {
_init_query_mem_tracker();
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker);
- this->coord_addr = coord_addr;
+
_start_time = VecDateTimeValue::local_time();
_shared_hash_table_controller.reset(new
vectorized::SharedHashTableController());
_shared_scanner_controller.reset(new
vectorized::SharedScannerController());
@@ -64,6 +64,18 @@ QueryContext::QueryContext(TUniqueId query_id, int
total_fragment_num, ExecEnv*
timeout_second = query_options.execution_timeout;
+ bool is_query_type_valid = query_options.query_type == TQueryType::SELECT
||
+ query_options.query_type == TQueryType::LOAD ||
+ query_options.query_type ==
TQueryType::EXTERNAL;
+ DCHECK_EQ(is_query_type_valid, true);
+
+ this->coord_addr = coord_addr;
+ // external query has no coord_addr
+ if (query_options.query_type != TQueryType::EXTERNAL) {
+ bool is_coord_addr_valid = !this->coord_addr.hostname.empty() &&
this->coord_addr.port != 0;
+ DCHECK_EQ(is_coord_addr_valid, true);
+ }
+
register_memory_statistics();
register_cpu_statistics();
}
@@ -252,8 +264,8 @@ void QueryContext::set_pipeline_context(
}
void QueryContext::register_query_statistics(std::shared_ptr<QueryStatistics>
qs) {
-
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(print_id(_query_id),
qs,
-
coord_addr);
+ _exec_env->runtime_query_statistics_mgr()->register_query_statistics(
+ print_id(_query_id), qs, coord_addr, _query_options.query_type);
}
std::shared_ptr<QueryStatistics> QueryContext::get_query_statistics() {
@@ -266,8 +278,8 @@ void QueryContext::register_memory_statistics() {
std::shared_ptr<QueryStatistics> qs =
query_mem_tracker->get_query_statistics();
std::string query_id = print_id(_query_id);
if (qs) {
-
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(query_id,
qs,
-
coord_addr);
+
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(
+ query_id, qs, coord_addr, _query_options.query_type);
} else {
LOG(INFO) << " query " << query_id << " get memory query
statistics failed ";
}
@@ -278,7 +290,7 @@ void QueryContext::register_cpu_statistics() {
if (!_cpu_statistics) {
_cpu_statistics = std::make_shared<QueryStatistics>();
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(
- print_id(_query_id), _cpu_statistics, coord_addr);
+ print_id(_query_id), _cpu_statistics, coord_addr,
_query_options.query_type);
}
}
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 955d1b9a7e8..5257f53bb00 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -36,10 +36,12 @@ void
QueryStatisticsCtx::collect_query_statistics(TQueryStatistics* tq_s) {
void RuntimeQueryStatiticsMgr::register_query_statistics(std::string query_id,
std::shared_ptr<QueryStatistics> qs_ptr,
- TNetworkAddress
fe_addr) {
+ TNetworkAddress
fe_addr,
+ TQueryType::type
query_type) {
std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
if (_query_statistics_ctx_map.find(query_id) ==
_query_statistics_ctx_map.end()) {
- _query_statistics_ctx_map[query_id] =
std::make_unique<QueryStatisticsCtx>(fe_addr);
+ _query_statistics_ctx_map[query_id] =
+ std::make_unique<QueryStatisticsCtx>(fe_addr, query_type);
}
_query_statistics_ctx_map.at(query_id)->_qs_list.push_back(qs_ptr);
}
@@ -54,6 +56,9 @@ void
RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
int64_t current_time = MonotonicMillis();
int64_t conf_qs_timeout = config::query_statistics_reserve_timeout_ms;
for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
+ if (qs_ctx_ptr->_query_type == TQueryType::EXTERNAL) {
+ continue;
+ }
if (fe_qs_map.find(qs_ctx_ptr->_fe_addr) == fe_qs_map.end()) {
std::map<std::string, TQueryStatistics> tmp_map;
fe_qs_map[qs_ctx_ptr->_fe_addr] = std::move(tmp_map);
@@ -247,6 +252,10 @@ void
RuntimeQueryStatiticsMgr::get_active_be_tasks_block(vectorized::Block* bloc
insert_int_value(8, tqs.current_used_memory_bytes, block);
insert_int_value(9, tqs.shuffle_send_bytes, block);
insert_int_value(10, tqs.shuffle_send_rows, block);
+
+ std::stringstream ss;
+ ss << qs_ctx_ptr->_query_type;
+ insert_string_value(11, ss.str(), block);
}
}
diff --git a/be/src/runtime/runtime_query_statistics_mgr.h
b/be/src/runtime/runtime_query_statistics_mgr.h
index 1b3e164d48f..aa1793efbf9 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.h
+++ b/be/src/runtime/runtime_query_statistics_mgr.h
@@ -34,7 +34,8 @@ class Block;
class QueryStatisticsCtx {
public:
- QueryStatisticsCtx(TNetworkAddress fe_addr) : _fe_addr(fe_addr) {
+ QueryStatisticsCtx(TNetworkAddress fe_addr, TQueryType::type query_type)
+ : _fe_addr(fe_addr), _query_type(query_type) {
this->_is_query_finished = false;
this->_wg_id = -1;
this->_query_start_time = MonotonicMillis();
@@ -46,7 +47,8 @@ public:
public:
std::vector<std::shared_ptr<QueryStatistics>> _qs_list;
bool _is_query_finished;
- TNetworkAddress _fe_addr;
+ const TNetworkAddress _fe_addr;
+ const TQueryType::type _query_type;
int64_t _query_finish_time;
int64_t _wg_id;
int64_t _query_start_time;
@@ -58,7 +60,7 @@ public:
~RuntimeQueryStatiticsMgr() = default;
void register_query_statistics(std::string query_id,
std::shared_ptr<QueryStatistics> qs_ptr,
- TNetworkAddress fe_addr);
+ TNetworkAddress fe_addr, TQueryType::type
query_type);
void report_runtime_query_statistics();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index a77532cef1e..92206dc3fdb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -458,6 +458,7 @@ public class SchemaTable extends Table {
.column("CURRENT_USED_MEMORY_BYTES",
ScalarType.createType(PrimitiveType.BIGINT))
.column("SHUFFLE_SEND_BYTES",
ScalarType.createType(PrimitiveType.BIGINT))
.column("SHUFFLE_SEND_ROWS",
ScalarType.createType(PrimitiveType.BIGINT))
+ .column("QUERY_TYPE",
ScalarType.createVarchar(256))
.build()))
.put("active_queries", new
SchemaTable(SystemIdGenerator.getNextId(), "active_queries", TableType.SCHEMA,
builder().column("QUERY_ID", ScalarType.createVarchar(256))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]