This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 298831ae7d8 [opt](scanner) Add scanner metrics (#40496)
298831ae7d8 is described below

commit 298831ae7d86b05f2b241ce0913073bc17f44ed8
Author: zhiqiang <[email protected]>
AuthorDate: Wed Sep 25 22:57:37 2024 +0800

    [opt](scanner) Add scanner metrics (#40496)
    
    We need something like this
    <img width="1426" alt="image"
    
src="https://github.com/user-attachments/assets/2a441af6-4fbb-4436-824d-2acc2bf1d497";>
---
 be/src/runtime/query_context.cpp           |  3 ++-
 be/src/util/doris_metrics.cpp              | 16 ++++++++++++++++
 be/src/util/doris_metrics.h                |  8 ++++++++
 be/src/vec/exec/scan/scanner_context.cpp   |  3 +++
 be/src/vec/exec/scan/scanner_context.h     |  4 +++-
 be/src/vec/exec/scan/scanner_scheduler.cpp | 11 +++++++++++
 be/src/vec/exec/scan/scanner_scheduler.h   | 10 +++++++++-
 be/src/vec/exec/scan/vscanner.cpp          |  1 +
 be/src/vec/exec/scan/vscanner.h            |  1 +
 9 files changed, 54 insertions(+), 3 deletions(-)

diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 055a78471e3..8931854897e 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -110,6 +110,7 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* 
exec_env,
     clock_gettime(CLOCK_MONOTONIC, &this->_query_arrival_timestamp);
     register_memory_statistics();
     register_cpu_statistics();
+    DorisMetrics::instance()->query_ctx_cnt->increment(1);
 }
 
 void QueryContext::_init_query_mem_tracker() {
@@ -198,7 +199,7 @@ QueryContext::~QueryContext() {
     _merge_controller_handler.reset();
 
     _exec_env->spill_stream_mgr()->async_cleanup_query(_query_id);
-
+    DorisMetrics::instance()->query_ctx_cnt->increment(-1);
     LOG_INFO("Query {} deconstructed, {}", print_id(this->_query_id), 
mem_tracker_msg);
 }
 
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 4ec0b1370e6..3fe6b92c923 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -187,6 +187,14 @@ 
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_total, MetricUnit::OPERAT
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_from_cache, 
MetricUnit::OPERATIONS);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_from_remote, 
MetricUnit::OPERATIONS);
 
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_cnt, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_cnt, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_queued, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_running, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_submit_failed, 
MetricUnit::NOUNIT);
+
 const std::string DorisMetrics::_s_registry_name = "doris_be";
 const std::string DorisMetrics::_s_hook_name = "doris_metrics";
 
@@ -306,6 +314,14 @@ DorisMetrics::DorisMetrics() : 
_metric_registry(_s_registry_name) {
     INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, 
num_io_bytes_read_total);
     INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, 
num_io_bytes_read_from_cache);
     INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, 
num_io_bytes_read_from_remote);
+
+    INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, query_ctx_cnt);
+    INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt);
+    INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt);
+    INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, 
scanner_task_cnt);
+    INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, 
scanner_task_queued);
+    INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, 
scanner_task_running);
+    INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, 
scanner_task_submit_failed);
 }
 
 void DorisMetrics::initialize(bool init_system_metrics, const 
std::set<std::string>& disk_devices,
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index b201369454f..3006461059c 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -234,6 +234,14 @@ public:
     IntAtomicCounter* num_io_bytes_read_from_cache = nullptr;
     IntAtomicCounter* num_io_bytes_read_from_remote = nullptr;
 
+    IntAtomicCounter* query_ctx_cnt = nullptr;
+    IntAtomicCounter* scanner_ctx_cnt = nullptr;
+    IntAtomicCounter* scanner_cnt = nullptr;
+    IntAtomicCounter* scanner_task_cnt = nullptr;
+    IntAtomicCounter* scanner_task_queued = nullptr;
+    IntAtomicCounter* scanner_task_submit_failed = nullptr;
+    IntAtomicCounter* scanner_task_running = nullptr;
+
     static DorisMetrics* instance() {
         static DorisMetrics instance;
         return &instance;
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index cbb3d0f5723..79bbcd94b8c 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -29,6 +29,7 @@
 #include "common/status.h"
 #include "pipeline/exec/scan_operator.h"
 #include "runtime/descriptors.h"
+#include "runtime/exec_env.h"
 #include "runtime/runtime_state.h"
 #include "util/uid_util.h"
 #include "vec/core/block.h"
@@ -120,6 +121,8 @@ ScannerContext::ScannerContext(
     _query_thread_context = {_query_id, _state->query_mem_tracker(),
                              _state->get_query_ctx()->workload_group()};
     _dependency = dependency;
+
+    DorisMetrics::instance()->scanner_ctx_cnt->increment(1);
 }
 
 // After init function call, should not access _parent
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index 03c4e5a4f1b..1ebad17d418 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -57,11 +57,13 @@ class ScanTask {
 public:
     ScanTask(std::weak_ptr<ScannerDelegate> delegate_scanner) : 
scanner(delegate_scanner) {
         _query_thread_context.init_unlocked();
+        DorisMetrics::instance()->scanner_task_cnt->increment(1);
     }
 
     ~ScanTask() {
         
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
         cached_blocks.clear();
+        DorisMetrics::instance()->scanner_task_cnt->increment(-1);
     }
 
 private:
@@ -117,6 +119,7 @@ public:
             // do nothing
         }
         block.reset();
+        DorisMetrics::instance()->scanner_ctx_cnt->increment(-1);
     }
     Status init();
 
@@ -155,7 +158,6 @@ public:
 
     RuntimeState* state() { return _state; }
     void incr_ctx_scheduling_time(int64_t num) { 
_scanner_ctx_sched_time->update(num); }
-
     std::string parent_name();
 
     bool empty_in_queue(int id);
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index fdd677f0687..34a12fbd978 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -42,6 +42,7 @@
 #include "util/cpu_info.h"
 #include "util/defer_op.h"
 #include "util/doris_metrics.h"
+#include "util/metrics.h"
 #include "util/runtime_profile.h"
 #include "util/thread.h"
 #include "util/threadpool.h"
@@ -141,6 +142,11 @@ Status 
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
 
         scanner_delegate->_scanner->start_wait_worker_timer();
         auto s = ctx->thread_token->submit_func([scanner_ref = scan_task, 
ctx]() {
+            DorisMetrics::instance()->scanner_task_queued->increment(-1);
+            DorisMetrics::instance()->scanner_task_running->increment(1);
+            Defer metrics_defer(
+                    [&] { 
DorisMetrics::instance()->scanner_task_running->increment(-1); });
+
             auto status = [&] {
                 RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
                 return Status::OK();
@@ -172,6 +178,11 @@ Status 
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
                         is_local ? _local_scan_thread_pool.get() : 
_remote_scan_thread_pool.get();
             }
             auto work_func = [scanner_ref = scan_task, ctx]() {
+                DorisMetrics::instance()->scanner_task_queued->increment(-1);
+                DorisMetrics::instance()->scanner_task_running->increment(1);
+                Defer metrics_defer(
+                        [&] { 
DorisMetrics::instance()->scanner_task_running->increment(-1); });
+
                 auto status = [&] {
                     RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
                     return Status::OK();
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index 439291f2107..f832e348088 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -18,9 +18,11 @@
 #pragma once
 
 #include <atomic>
+#include <cstdint>
 #include <memory>
 
 #include "common/status.h"
+#include "util/doris_metrics.h"
 #include "util/threadpool.h"
 #include "vec/exec/scan/vscanner.h"
 
@@ -135,7 +137,13 @@ public:
 
     Status submit_scan_task(SimplifiedScanTask scan_task) {
         if (!_is_stop) {
-            return _scan_thread_pool->submit_func([scan_task] { 
scan_task.scan_func(); });
+            DorisMetrics::instance()->scanner_task_queued->increment(1);
+            auto st = _scan_thread_pool->submit_func([scan_task] { 
scan_task.scan_func(); });
+            if (!st.ok()) {
+                DorisMetrics::instance()->scanner_task_queued->increment(-1);
+                
DorisMetrics::instance()->scanner_task_submit_failed->increment(1);
+            }
+            return st;
         } else {
             return Status::InternalError<false>("scanner pool {} is 
shutdown.", _sched_name);
         }
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
index a78f8956025..ae255f85a7f 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -39,6 +39,7 @@ VScanner::VScanner(RuntimeState* state, 
pipeline::ScanLocalStateBase* local_stat
           _output_tuple_desc(_local_state->output_tuple_desc()),
           
_output_row_descriptor(_local_state->_parent->output_row_descriptor()) {
     _total_rf_num = _local_state->runtime_filter_num();
+    DorisMetrics::instance()->scanner_cnt->increment(1);
 }
 
 Status VScanner::prepare(RuntimeState* state, const VExprContextSPtrs& 
conjuncts) {
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 29ad37e9269..6c4f3294ce1 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -66,6 +66,7 @@ public:
         _origin_block.clear();
         _common_expr_ctxs_push_down.clear();
         _stale_expr_ctxs.clear();
+        DorisMetrics::instance()->scanner_cnt->increment(-1);
     }
 
     virtual Status init() { return Status::OK(); }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to