This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 298831ae7d8 [opt](scanner) Add scanner metrics (#40496)
298831ae7d8 is described below
commit 298831ae7d86b05f2b241ce0913073bc17f44ed8
Author: zhiqiang <[email protected]>
AuthorDate: Wed Sep 25 22:57:37 2024 +0800
[opt](scanner) Add scanner metrics (#40496)
We need something like this
<img width="1426" alt="image"
src="https://github.com/user-attachments/assets/2a441af6-4fbb-4436-824d-2acc2bf1d497">
---
be/src/runtime/query_context.cpp | 3 ++-
be/src/util/doris_metrics.cpp | 16 ++++++++++++++++
be/src/util/doris_metrics.h | 8 ++++++++
be/src/vec/exec/scan/scanner_context.cpp | 3 +++
be/src/vec/exec/scan/scanner_context.h | 4 +++-
be/src/vec/exec/scan/scanner_scheduler.cpp | 11 +++++++++++
be/src/vec/exec/scan/scanner_scheduler.h | 10 +++++++++-
be/src/vec/exec/scan/vscanner.cpp | 1 +
be/src/vec/exec/scan/vscanner.h | 1 +
9 files changed, 54 insertions(+), 3 deletions(-)
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 055a78471e3..8931854897e 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -110,6 +110,7 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv*
exec_env,
clock_gettime(CLOCK_MONOTONIC, &this->_query_arrival_timestamp);
register_memory_statistics();
register_cpu_statistics();
+ DorisMetrics::instance()->query_ctx_cnt->increment(1);
}
void QueryContext::_init_query_mem_tracker() {
@@ -198,7 +199,7 @@ QueryContext::~QueryContext() {
_merge_controller_handler.reset();
_exec_env->spill_stream_mgr()->async_cleanup_query(_query_id);
-
+ DorisMetrics::instance()->query_ctx_cnt->increment(-1);
LOG_INFO("Query {} deconstructed, {}", print_id(this->_query_id),
mem_tracker_msg);
}
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 4ec0b1370e6..3fe6b92c923 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -187,6 +187,14 @@
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_total, MetricUnit::OPERAT
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_from_cache,
MetricUnit::OPERATIONS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_from_remote,
MetricUnit::OPERATIONS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_cnt, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_cnt, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_queued, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_running, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_submit_failed,
MetricUnit::NOUNIT);
+
const std::string DorisMetrics::_s_registry_name = "doris_be";
const std::string DorisMetrics::_s_hook_name = "doris_metrics";
@@ -306,6 +314,14 @@ DorisMetrics::DorisMetrics() :
_metric_registry(_s_registry_name) {
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity,
num_io_bytes_read_total);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity,
num_io_bytes_read_from_cache);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity,
num_io_bytes_read_from_remote);
+
+ INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, query_ctx_cnt);
+ INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt);
+ INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt);
+ INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity,
scanner_task_cnt);
+ INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity,
scanner_task_queued);
+ INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity,
scanner_task_running);
+ INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity,
scanner_task_submit_failed);
}
void DorisMetrics::initialize(bool init_system_metrics, const
std::set<std::string>& disk_devices,
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index b201369454f..3006461059c 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -234,6 +234,14 @@ public:
IntAtomicCounter* num_io_bytes_read_from_cache = nullptr;
IntAtomicCounter* num_io_bytes_read_from_remote = nullptr;
+ IntAtomicCounter* query_ctx_cnt = nullptr;
+ IntAtomicCounter* scanner_ctx_cnt = nullptr;
+ IntAtomicCounter* scanner_cnt = nullptr;
+ IntAtomicCounter* scanner_task_cnt = nullptr;
+ IntAtomicCounter* scanner_task_queued = nullptr;
+ IntAtomicCounter* scanner_task_submit_failed = nullptr;
+ IntAtomicCounter* scanner_task_running = nullptr;
+
static DorisMetrics* instance() {
static DorisMetrics instance;
return &instance;
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index cbb3d0f5723..79bbcd94b8c 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -29,6 +29,7 @@
#include "common/status.h"
#include "pipeline/exec/scan_operator.h"
#include "runtime/descriptors.h"
+#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
@@ -120,6 +121,8 @@ ScannerContext::ScannerContext(
_query_thread_context = {_query_id, _state->query_mem_tracker(),
_state->get_query_ctx()->workload_group()};
_dependency = dependency;
+
+ DorisMetrics::instance()->scanner_ctx_cnt->increment(1);
}
// After init function call, should not access _parent
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index 03c4e5a4f1b..1ebad17d418 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -57,11 +57,13 @@ class ScanTask {
public:
ScanTask(std::weak_ptr<ScannerDelegate> delegate_scanner) :
scanner(delegate_scanner) {
_query_thread_context.init_unlocked();
+ DorisMetrics::instance()->scanner_task_cnt->increment(1);
}
~ScanTask() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
cached_blocks.clear();
+ DorisMetrics::instance()->scanner_task_cnt->increment(-1);
}
private:
@@ -117,6 +119,7 @@ public:
// do nothing
}
block.reset();
+ DorisMetrics::instance()->scanner_ctx_cnt->increment(-1);
}
Status init();
@@ -155,7 +158,6 @@ public:
RuntimeState* state() { return _state; }
void incr_ctx_scheduling_time(int64_t num) {
_scanner_ctx_sched_time->update(num); }
-
std::string parent_name();
bool empty_in_queue(int id);
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index fdd677f0687..34a12fbd978 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -42,6 +42,7 @@
#include "util/cpu_info.h"
#include "util/defer_op.h"
#include "util/doris_metrics.h"
+#include "util/metrics.h"
#include "util/runtime_profile.h"
#include "util/thread.h"
#include "util/threadpool.h"
@@ -141,6 +142,11 @@ Status
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
scanner_delegate->_scanner->start_wait_worker_timer();
auto s = ctx->thread_token->submit_func([scanner_ref = scan_task,
ctx]() {
+ DorisMetrics::instance()->scanner_task_queued->increment(-1);
+ DorisMetrics::instance()->scanner_task_running->increment(1);
+ Defer metrics_defer(
+ [&] {
DorisMetrics::instance()->scanner_task_running->increment(-1); });
+
auto status = [&] {
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
return Status::OK();
@@ -172,6 +178,11 @@ Status
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
is_local ? _local_scan_thread_pool.get() :
_remote_scan_thread_pool.get();
}
auto work_func = [scanner_ref = scan_task, ctx]() {
+ DorisMetrics::instance()->scanner_task_queued->increment(-1);
+ DorisMetrics::instance()->scanner_task_running->increment(1);
+ Defer metrics_defer(
+ [&] {
DorisMetrics::instance()->scanner_task_running->increment(-1); });
+
auto status = [&] {
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
return Status::OK();
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h
b/be/src/vec/exec/scan/scanner_scheduler.h
index 439291f2107..f832e348088 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -18,9 +18,11 @@
#pragma once
#include <atomic>
+#include <cstdint>
#include <memory>
#include "common/status.h"
+#include "util/doris_metrics.h"
#include "util/threadpool.h"
#include "vec/exec/scan/vscanner.h"
@@ -135,7 +137,13 @@ public:
Status submit_scan_task(SimplifiedScanTask scan_task) {
if (!_is_stop) {
- return _scan_thread_pool->submit_func([scan_task] {
scan_task.scan_func(); });
+ DorisMetrics::instance()->scanner_task_queued->increment(1);
+ auto st = _scan_thread_pool->submit_func([scan_task] {
scan_task.scan_func(); });
+ if (!st.ok()) {
+ DorisMetrics::instance()->scanner_task_queued->increment(-1);
+
DorisMetrics::instance()->scanner_task_submit_failed->increment(1);
+ }
+ return st;
} else {
return Status::InternalError<false>("scanner pool {} is
shutdown.", _sched_name);
}
diff --git a/be/src/vec/exec/scan/vscanner.cpp
b/be/src/vec/exec/scan/vscanner.cpp
index a78f8956025..ae255f85a7f 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -39,6 +39,7 @@ VScanner::VScanner(RuntimeState* state,
pipeline::ScanLocalStateBase* local_stat
_output_tuple_desc(_local_state->output_tuple_desc()),
_output_row_descriptor(_local_state->_parent->output_row_descriptor()) {
_total_rf_num = _local_state->runtime_filter_num();
+ DorisMetrics::instance()->scanner_cnt->increment(1);
}
Status VScanner::prepare(RuntimeState* state, const VExprContextSPtrs&
conjuncts) {
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 29ad37e9269..6c4f3294ce1 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -66,6 +66,7 @@ public:
_origin_block.clear();
_common_expr_ctxs_push_down.clear();
_stale_expr_ctxs.clear();
+ DorisMetrics::instance()->scanner_cnt->increment(-1);
}
virtual Status init() { return Status::OK(); }
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]