This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 75a6f28f2e1 [cherry-pick]Add query type when report (#35918)
75a6f28f2e1 is described below

commit 75a6f28f2e15d940705c77af60e66964ed3301eb
Author: wangbo <[email protected]>
AuthorDate: Tue Jun 11 10:51:59 2024 +0800

    [cherry-pick]Add query type when report (#35918)
    
    pick #34978
---
 .../schema_scanner/schema_backend_active_tasks.cpp |  1 +
 be/src/runtime/fragment_mgr.cpp                    |  9 ++++----
 be/src/runtime/query_context.cpp                   | 24 ++++++++++++++++------
 be/src/runtime/runtime_query_statistics_mgr.cpp    | 13 ++++++++++--
 be/src/runtime/runtime_query_statistics_mgr.h      |  8 +++++---
 .../java/org/apache/doris/catalog/SchemaTable.java |  1 +
 6 files changed, 41 insertions(+), 15 deletions(-)

diff --git a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp 
b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
index aa84f0d68c3..f1155796ed4 100644
--- a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
+++ b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
@@ -38,6 +38,7 @@ std::vector<SchemaScanner::ColumnDesc> 
SchemaBackendActiveTasksScanner::_s_tbls_
         {"CURRENT_USED_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
         {"SHUFFLE_SEND_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
         {"SHUFFLE_SEND_ROWS", TYPE_BIGINT, sizeof(int64_t), false},
+        {"QUERY_TYPE", TYPE_VARCHAR, sizeof(StringRef), false},
 };
 
 SchemaBackendActiveTasksScanner::SchemaBackendActiveTasksScanner()
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 6646b8cdf27..4ca84f94041 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -640,6 +640,11 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
             return Status::OK();
         }
 
+        LOG(INFO) << "query_id: " << print_id(query_id) << ", coord_addr: " << 
params.coord
+                  << ", total fragment num on current host: " << 
params.fragment_num_on_host
+                  << ", fe process uuid: " << 
params.query_options.fe_process_uuid
+                  << ", query type: " << params.query_options.query_type;
+
         // This may be a first fragment request of the query.
         // Create the query fragments context.
         query_ctx = QueryContext::create_shared(query_id, 
params.fragment_num_on_host, _exec_env,
@@ -653,10 +658,6 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
             query_ctx->file_scan_range_params_map = params.file_scan_params;
         }
 
-        LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id().hi, 
query_ctx->query_id().lo)
-                  << " coord_addr " << query_ctx->coord_addr
-                  << " total fragment num on current host: " << 
params.fragment_num_on_host
-                  << " fe process uuid: " << 
params.query_options.fe_process_uuid;
         query_ctx->query_globals = params.query_globals;
 
         if (params.__isset.resource_info) {
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 4b4b6b006f1..f9cc9757fe3 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -53,7 +53,7 @@ QueryContext::QueryContext(TUniqueId query_id, int 
total_fragment_num, ExecEnv*
           _query_options(query_options) {
     _init_query_mem_tracker();
     SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker);
-    this->coord_addr = coord_addr;
+
     _start_time = VecDateTimeValue::local_time();
     _shared_hash_table_controller.reset(new 
vectorized::SharedHashTableController());
     _shared_scanner_controller.reset(new 
vectorized::SharedScannerController());
@@ -64,6 +64,18 @@ QueryContext::QueryContext(TUniqueId query_id, int 
total_fragment_num, ExecEnv*
 
     timeout_second = query_options.execution_timeout;
 
+    bool is_query_type_valid = query_options.query_type == TQueryType::SELECT 
||
+                               query_options.query_type == TQueryType::LOAD ||
+                               query_options.query_type == 
TQueryType::EXTERNAL;
+    DCHECK_EQ(is_query_type_valid, true);
+
+    this->coord_addr = coord_addr;
+    // external query has no coord_addr
+    if (query_options.query_type != TQueryType::EXTERNAL) {
+        bool is_coord_addr_valid = !this->coord_addr.hostname.empty() && 
this->coord_addr.port != 0;
+        DCHECK_EQ(is_coord_addr_valid, true);
+    }
+
     register_memory_statistics();
     register_cpu_statistics();
 }
@@ -252,8 +264,8 @@ void QueryContext::set_pipeline_context(
 }
 
 void QueryContext::register_query_statistics(std::shared_ptr<QueryStatistics> 
qs) {
-    
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(print_id(_query_id),
 qs,
-                                                                         
coord_addr);
+    _exec_env->runtime_query_statistics_mgr()->register_query_statistics(
+            print_id(_query_id), qs, coord_addr, _query_options.query_type);
 }
 
 std::shared_ptr<QueryStatistics> QueryContext::get_query_statistics() {
@@ -266,8 +278,8 @@ void QueryContext::register_memory_statistics() {
         std::shared_ptr<QueryStatistics> qs = 
query_mem_tracker->get_query_statistics();
         std::string query_id = print_id(_query_id);
         if (qs) {
-            
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(query_id, 
qs,
-                                                                               
  coord_addr);
+            
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(
+                    query_id, qs, coord_addr, _query_options.query_type);
         } else {
             LOG(INFO) << " query " << query_id << " get memory query 
statistics failed ";
         }
@@ -278,7 +290,7 @@ void QueryContext::register_cpu_statistics() {
     if (!_cpu_statistics) {
         _cpu_statistics = std::make_shared<QueryStatistics>();
         _exec_env->runtime_query_statistics_mgr()->register_query_statistics(
-                print_id(_query_id), _cpu_statistics, coord_addr);
+                print_id(_query_id), _cpu_statistics, coord_addr, 
_query_options.query_type);
     }
 }
 
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp 
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 955d1b9a7e8..5257f53bb00 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -36,10 +36,12 @@ void 
QueryStatisticsCtx::collect_query_statistics(TQueryStatistics* tq_s) {
 
 void RuntimeQueryStatiticsMgr::register_query_statistics(std::string query_id,
                                                          
std::shared_ptr<QueryStatistics> qs_ptr,
-                                                         TNetworkAddress 
fe_addr) {
+                                                         TNetworkAddress 
fe_addr,
+                                                         TQueryType::type 
query_type) {
     std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
     if (_query_statistics_ctx_map.find(query_id) == 
_query_statistics_ctx_map.end()) {
-        _query_statistics_ctx_map[query_id] = 
std::make_unique<QueryStatisticsCtx>(fe_addr);
+        _query_statistics_ctx_map[query_id] =
+                std::make_unique<QueryStatisticsCtx>(fe_addr, query_type);
     }
     _query_statistics_ctx_map.at(query_id)->_qs_list.push_back(qs_ptr);
 }
@@ -54,6 +56,9 @@ void 
RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
         int64_t current_time = MonotonicMillis();
         int64_t conf_qs_timeout = config::query_statistics_reserve_timeout_ms;
         for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
+            if (qs_ctx_ptr->_query_type == TQueryType::EXTERNAL) {
+                continue;
+            }
             if (fe_qs_map.find(qs_ctx_ptr->_fe_addr) == fe_qs_map.end()) {
                 std::map<std::string, TQueryStatistics> tmp_map;
                 fe_qs_map[qs_ctx_ptr->_fe_addr] = std::move(tmp_map);
@@ -247,6 +252,10 @@ void 
RuntimeQueryStatiticsMgr::get_active_be_tasks_block(vectorized::Block* bloc
         insert_int_value(8, tqs.current_used_memory_bytes, block);
         insert_int_value(9, tqs.shuffle_send_bytes, block);
         insert_int_value(10, tqs.shuffle_send_rows, block);
+
+        std::stringstream ss;
+        ss << qs_ctx_ptr->_query_type;
+        insert_string_value(11, ss.str(), block);
     }
 }
 
diff --git a/be/src/runtime/runtime_query_statistics_mgr.h 
b/be/src/runtime/runtime_query_statistics_mgr.h
index 1b3e164d48f..aa1793efbf9 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.h
+++ b/be/src/runtime/runtime_query_statistics_mgr.h
@@ -34,7 +34,8 @@ class Block;
 
 class QueryStatisticsCtx {
 public:
-    QueryStatisticsCtx(TNetworkAddress fe_addr) : _fe_addr(fe_addr) {
+    QueryStatisticsCtx(TNetworkAddress fe_addr, TQueryType::type query_type)
+            : _fe_addr(fe_addr), _query_type(query_type) {
         this->_is_query_finished = false;
         this->_wg_id = -1;
         this->_query_start_time = MonotonicMillis();
@@ -46,7 +47,8 @@ public:
 public:
     std::vector<std::shared_ptr<QueryStatistics>> _qs_list;
     bool _is_query_finished;
-    TNetworkAddress _fe_addr;
+    const TNetworkAddress _fe_addr;
+    const TQueryType::type _query_type;
     int64_t _query_finish_time;
     int64_t _wg_id;
     int64_t _query_start_time;
@@ -58,7 +60,7 @@ public:
     ~RuntimeQueryStatiticsMgr() = default;
 
     void register_query_statistics(std::string query_id, 
std::shared_ptr<QueryStatistics> qs_ptr,
-                                   TNetworkAddress fe_addr);
+                                   TNetworkAddress fe_addr, TQueryType::type 
query_type);
 
     void report_runtime_query_statistics();
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index a77532cef1e..92206dc3fdb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -458,6 +458,7 @@ public class SchemaTable extends Table {
                                     .column("CURRENT_USED_MEMORY_BYTES", 
ScalarType.createType(PrimitiveType.BIGINT))
                                     .column("SHUFFLE_SEND_BYTES", 
ScalarType.createType(PrimitiveType.BIGINT))
                                     .column("SHUFFLE_SEND_ROWS", 
ScalarType.createType(PrimitiveType.BIGINT))
+                                    .column("QUERY_TYPE",  
ScalarType.createVarchar(256))
                                     .build()))
             .put("active_queries", new 
SchemaTable(SystemIdGenerator.getNextId(), "active_queries", TableType.SCHEMA,
                     builder().column("QUERY_ID", ScalarType.createVarchar(256))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to