This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new e0c9cbde497 [opt](scanner) Add scanner metrics #40496 (#41314)
e0c9cbde497 is described below
commit e0c9cbde497b502ddb37d60d95b288469d856d30
Author: zhiqiang <[email protected]>
AuthorDate: Fri Sep 27 09:56:51 2024 +0800
[opt](scanner) Add scanner metrics #40496 (#41314)
cherry pick from #40496
---
be/src/runtime/query_context.cpp | 3 ++-
be/src/util/doris_metrics.cpp | 16 ++++++++++++++++
be/src/util/doris_metrics.h | 9 ++++++++-
be/src/vec/exec/scan/scanner_context.cpp | 3 +++
be/src/vec/exec/scan/scanner_context.h | 4 +++-
be/src/vec/exec/scan/scanner_scheduler.cpp | 11 +++++++++++
be/src/vec/exec/scan/scanner_scheduler.h | 10 +++++++++-
be/src/vec/exec/scan/vscanner.cpp | 1 +
be/src/vec/exec/scan/vscanner.h | 1 +
9 files changed, 54 insertions(+), 4 deletions(-)
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index e69132e3cb2..ec559068e19 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -99,6 +99,7 @@ QueryContext::QueryContext(TUniqueId query_id, int
total_fragment_num, ExecEnv*
clock_gettime(CLOCK_MONOTONIC, &this->_query_arrival_timestamp);
register_memory_statistics();
register_cpu_statistics();
+ DorisMetrics::instance()->query_ctx_cnt->increment(1);
}
void QueryContext::_init_query_mem_tracker() {
@@ -183,7 +184,7 @@ QueryContext::~QueryContext() {
obj_pool.clear();
_exec_env->spill_stream_mgr()->async_cleanup_query(_query_id);
-
+ DorisMetrics::instance()->query_ctx_cnt->increment(-1);
LOG_INFO("Query {} deconstructed, {}", print_id(this->_query_id),
mem_tracker_msg);
}
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 6e1485a81b7..165dfd632b9 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -179,6 +179,14 @@
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(broker_file_open_reading, MetricUnit::FILESYS
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_file_open_writing,
MetricUnit::FILESYSTEM);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(s3_file_open_writing,
MetricUnit::FILESYSTEM);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_cnt, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_cnt, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_queued, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_running, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_submit_failed,
MetricUnit::NOUNIT);
+
const std::string DorisMetrics::_s_registry_name = "doris_be";
const std::string DorisMetrics::_s_hook_name = "doris_metrics";
@@ -293,6 +301,14 @@ DorisMetrics::DorisMetrics() :
_metric_registry(_s_registry_name) {
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, broker_file_open_reading);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, local_file_open_writing);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, s3_file_open_writing);
+
+ INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, query_ctx_cnt);
+ INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt);
+ INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt);
+ INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity,
scanner_task_cnt);
+ INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity,
scanner_task_queued);
+ INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity,
scanner_task_running);
+ INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity,
scanner_task_submit_failed);
}
void DorisMetrics::initialize(bool init_system_metrics, const
std::set<std::string>& disk_devices,
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 567efdc9ae5..21e92bb82df 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -228,11 +228,18 @@ public:
UIntGauge* group_local_scan_thread_pool_queue_size = nullptr;
UIntGauge* group_local_scan_thread_pool_thread_num = nullptr;
+ IntAtomicCounter* query_ctx_cnt = nullptr;
+ IntAtomicCounter* scanner_ctx_cnt = nullptr;
+ IntAtomicCounter* scanner_cnt = nullptr;
+ IntAtomicCounter* scanner_task_cnt = nullptr;
+ IntAtomicCounter* scanner_task_queued = nullptr;
+ IntAtomicCounter* scanner_task_submit_failed = nullptr;
+ IntAtomicCounter* scanner_task_running = nullptr;
+
static DorisMetrics* instance() {
static DorisMetrics instance;
return &instance;
}
-
// not thread-safe, call before calling metrics
void initialize(
bool init_system_metrics = false,
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 77cdce5c4a2..0fa6c1dae1e 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -29,6 +29,7 @@
#include "common/status.h"
#include "pipeline/exec/scan_operator.h"
#include "runtime/descriptors.h"
+#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
@@ -138,6 +139,8 @@ ScannerContext::ScannerContext(doris::RuntimeState* state,
doris::vectorized::VS
: ScannerContext(state, output_tuple_desc, output_row_descriptor,
scanners, limit_,
max_bytes_in_blocks_queue, num_parallel_instances,
local_state) {
_parent = parent;
+
+ DorisMetrics::instance()->scanner_ctx_cnt->increment(1);
}
// After init function call, should not access _parent
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index 1fe2b5d9aca..fe72b740a1b 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -58,11 +58,13 @@ class ScanTask {
public:
ScanTask(std::weak_ptr<ScannerDelegate> delegate_scanner) :
scanner(delegate_scanner) {
_query_thread_context.init();
+ DorisMetrics::instance()->scanner_task_cnt->increment(1);
}
~ScanTask() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
cached_blocks.clear();
+ DorisMetrics::instance()->scanner_task_cnt->increment(-1);
}
private:
@@ -116,6 +118,7 @@ public:
// do nothing
}
block.reset();
+ DorisMetrics::instance()->scanner_ctx_cnt->increment(-1);
}
virtual Status init();
@@ -155,7 +158,6 @@ public:
RuntimeState* state() { return _state; }
void incr_ctx_scheduling_time(int64_t num) {
_scanner_ctx_sched_time->update(num); }
-
std::string parent_name();
virtual bool empty_in_queue(int id);
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 01d50b4ff3c..fd2d1a14699 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -41,6 +41,7 @@
#include "util/cpu_info.h"
#include "util/defer_op.h"
#include "util/doris_metrics.h"
+#include "util/metrics.h"
#include "util/runtime_profile.h"
#include "util/thread.h"
#include "util/threadpool.h"
@@ -140,6 +141,11 @@ Status
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
scanner_delegate->_scanner->start_wait_worker_timer();
auto s = ctx->thread_token->submit_func([scanner_ref = scan_task,
ctx]() {
+ DorisMetrics::instance()->scanner_task_queued->increment(-1);
+ DorisMetrics::instance()->scanner_task_running->increment(1);
+ Defer metrics_defer(
+ [&] {
DorisMetrics::instance()->scanner_task_running->increment(-1); });
+
auto status = [&] {
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
return Status::OK();
@@ -171,6 +177,11 @@ Status
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
is_local ? _local_scan_thread_pool.get() :
_remote_scan_thread_pool.get();
}
auto work_func = [scanner_ref = scan_task, ctx]() {
+ DorisMetrics::instance()->scanner_task_queued->increment(-1);
+ DorisMetrics::instance()->scanner_task_running->increment(1);
+ Defer metrics_defer(
+ [&] {
DorisMetrics::instance()->scanner_task_running->increment(-1); });
+
auto status = [&] {
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
return Status::OK();
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h
b/be/src/vec/exec/scan/scanner_scheduler.h
index 439291f2107..f832e348088 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -18,9 +18,11 @@
#pragma once
#include <atomic>
+#include <cstdint>
#include <memory>
#include "common/status.h"
+#include "util/doris_metrics.h"
#include "util/threadpool.h"
#include "vec/exec/scan/vscanner.h"
@@ -135,7 +137,13 @@ public:
Status submit_scan_task(SimplifiedScanTask scan_task) {
if (!_is_stop) {
- return _scan_thread_pool->submit_func([scan_task] {
scan_task.scan_func(); });
+ DorisMetrics::instance()->scanner_task_queued->increment(1);
+ auto st = _scan_thread_pool->submit_func([scan_task] {
scan_task.scan_func(); });
+ if (!st.ok()) {
+ DorisMetrics::instance()->scanner_task_queued->increment(-1);
+
DorisMetrics::instance()->scanner_task_submit_failed->increment(1);
+ }
+ return st;
} else {
return Status::InternalError<false>("scanner pool {} is
shutdown.", _sched_name);
}
diff --git a/be/src/vec/exec/scan/vscanner.cpp
b/be/src/vec/exec/scan/vscanner.cpp
index 39f908cf169..97a2ba8207a 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -52,6 +52,7 @@ VScanner::VScanner(RuntimeState* state,
pipeline::ScanLocalStateBase* local_stat
_output_tuple_desc(_local_state->output_tuple_desc()),
_output_row_descriptor(_local_state->_parent->output_row_descriptor()) {
_total_rf_num = _local_state->runtime_filter_num();
+ DorisMetrics::instance()->scanner_cnt->increment(1);
}
Status VScanner::prepare(RuntimeState* state, const VExprContextSPtrs&
conjuncts) {
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index b42ec8d99d5..e85c6082ca6 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -69,6 +69,7 @@ public:
_origin_block.clear();
_common_expr_ctxs_push_down.clear();
_stale_expr_ctxs.clear();
+ DorisMetrics::instance()->scanner_cnt->increment(-1);
}
virtual Status init() { return Status::OK(); }
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]