This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 196fadc044f [enhancement](metrics)  enhance visibility of flush thread 
pool (#26544)
196fadc044f is described below

commit 196fadc044fa368a8581f6d705a0281eef58e991
Author: Siyang Tang <[email protected]>
AuthorDate: Sat Nov 11 19:53:24 2023 +0800

    [enhancement](metrics)  enhance visibility of flush thread pool (#26544)
---
 be/src/olap/memtable_flush_executor.cpp    | 52 ++++++++++++++++++++--------
 be/src/olap/memtable_flush_executor.h      |  9 +++--
 be/src/util/doris_metrics.h                | 12 +++++++
 be/src/vec/exec/scan/scanner_scheduler.cpp | 55 ++++++++++++++++++++++++++----
 be/src/vec/exec/scan/scanner_scheduler.h   |  3 ++
 5 files changed, 106 insertions(+), 25 deletions(-)

diff --git a/be/src/olap/memtable_flush_executor.cpp 
b/be/src/olap/memtable_flush_executor.cpp
index c5d74a6de5e..ac114291a0b 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -18,9 +18,9 @@
 #include "olap/memtable_flush_executor.h"
 
 #include <gen_cpp/olap_file.pb.h>
-#include <stddef.h>
 
 #include <algorithm>
+#include <cstddef>
 #include <ostream>
 
 #include "common/config.h"
@@ -29,12 +29,18 @@
 #include "olap/memtable.h"
 #include "olap/rowset/rowset_writer.h"
 #include "util/doris_metrics.h"
+#include "util/metrics.h"
 #include "util/stopwatch.hpp"
 #include "util/time.h"
 
 namespace doris {
 using namespace ErrorCode;
 
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_thread_num, 
MetricUnit::NOUNIT);
+
+bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num");
+
 class MemtableFlushTask final : public Runnable {
 public:
     MemtableFlushTask(FlushToken* flush_token, std::unique_ptr<MemTable> 
memtable,
@@ -42,9 +48,11 @@ public:
             : _flush_token(flush_token),
               _memtable(std::move(memtable)),
               _segment_id(segment_id),
-              _submit_task_time(submit_task_time) {}
+              _submit_task_time(submit_task_time) {
+        g_flush_task_num << 1;
+    }
 
-    ~MemtableFlushTask() override = default;
+    ~MemtableFlushTask() override { g_flush_task_num << -1; }
 
     void run() override {
         _flush_token->_flush_memtable(_memtable.get(), _segment_id, 
_submit_task_time);
@@ -122,7 +130,8 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, 
int32_t segment_id, in
     return Status::OK();
 }
 
-void FlushToken::_flush_memtable(MemTable* memtable, int32_t segment_id, 
int64_t submit_task_time) {
+void FlushToken::_flush_memtable(MemTable* mem_table, int32_t segment_id,
+                                 int64_t submit_task_time) {
     uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time;
     _stats.flush_wait_time_ns += flush_wait_time_ns;
     // If previous flush has failed, return directly
@@ -135,10 +144,10 @@ void FlushToken::_flush_memtable(MemTable* memtable, 
int32_t segment_id, int64_t
 
     MonotonicStopWatch timer;
     timer.start();
-    size_t memory_usage = memtable->memory_usage();
+    size_t memory_usage = mem_table->memory_usage();
 
     int64_t flush_size;
-    Status s = _do_flush_memtable(memtable, segment_id, &flush_size);
+    Status s = _do_flush_memtable(mem_table, segment_id, &flush_size);
 
     {
         std::shared_lock rdlk(_flush_status_lock);
@@ -161,7 +170,7 @@ void FlushToken::_flush_memtable(MemTable* memtable, 
int32_t segment_id, int64_t
     _stats.flush_time_ns += timer.elapsed_time();
     _stats.flush_finish_count++;
     _stats.flush_running_count--;
-    _stats.flush_size_bytes += memtable->memory_usage();
+    _stats.flush_size_bytes += mem_table->memory_usage();
     _stats.flush_disk_size_bytes += flush_size;
 }
 
@@ -180,6 +189,7 @@ void MemTableFlushExecutor::init(const 
std::vector<DataDir*>& data_dirs) {
                               .set_min_threads(min_threads)
                               .set_max_threads(max_threads)
                               .build(&_high_prio_flush_pool));
+    _register_metrics();
 }
 
 // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are 
flushed in order.
@@ -189,26 +199,38 @@ Status 
MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& fl
     if (!is_high_priority) {
         if (rowset_writer->type() == BETA_ROWSET && !should_serial) {
             // beta rowset can be flush in CONCURRENT, because each memtable 
using a new segment writer.
-            flush_token.reset(
-                    new 
FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT)));
+            flush_token = std::make_unique<FlushToken>(
+                    
_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT));
         } else {
             // alpha rowset do not support flush in CONCURRENT.
-            flush_token.reset(
-                    new 
FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)));
+            flush_token = std::make_unique<FlushToken>(
+                    _flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL));
         }
     } else {
         if (rowset_writer->type() == BETA_ROWSET && !should_serial) {
             // beta rowset can be flush in CONCURRENT, because each memtable 
using a new segment writer.
-            flush_token.reset(new FlushToken(
-                    
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT)));
+            flush_token = std::make_unique<FlushToken>(
+                    
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT));
         } else {
             // alpha rowset do not support flush in CONCURRENT.
-            flush_token.reset(new FlushToken(
-                    
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)));
+            flush_token = std::make_unique<FlushToken>(
+                    
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL));
         }
     }
     flush_token->set_rowset_writer(rowset_writer);
     return Status::OK();
 }
 
+void MemTableFlushExecutor::_register_metrics() {
+    REGISTER_HOOK_METRIC(flush_thread_pool_queue_size,
+                         [this]() { return _flush_pool->get_queue_size(); });
+    REGISTER_HOOK_METRIC(flush_thread_pool_thread_num,
+                         [this]() { return _flush_pool->num_threads(); })
+}
+
+void MemTableFlushExecutor::_deregister_metrics() {
+    DEREGISTER_HOOK_METRIC(flush_thread_pool_queue_size);
+    DEREGISTER_HOOK_METRIC(flush_thread_pool_thread_num);
+}
+
 } // namespace doris
diff --git a/be/src/olap/memtable_flush_executor.h 
b/be/src/olap/memtable_flush_executor.h
index ee7194349f6..d2039ce8127 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -17,9 +17,8 @@
 
 #pragma once
 
-#include <stdint.h>
-
 #include <atomic>
+#include <cstdint>
 #include <iosfwd>
 #include <memory>
 #include <utility>
@@ -108,8 +107,9 @@ private:
 //      ...
 class MemTableFlushExecutor {
 public:
-    MemTableFlushExecutor() {}
+    MemTableFlushExecutor() = default;
     ~MemTableFlushExecutor() {
+        _deregister_metrics();
         _flush_pool->shutdown();
         _high_prio_flush_pool->shutdown();
     }
@@ -122,6 +122,9 @@ public:
                               bool should_serial, bool is_high_priority);
 
 private:
+    void _register_metrics();
+    static void _deregister_metrics();
+
     std::unique_ptr<ThreadPool> _flush_pool;
     std::unique_ptr<ThreadPool> _high_prio_flush_pool;
 };
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 60ca3b51072..409e6dae2b6 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -228,6 +228,18 @@ public:
     UIntGauge* heavy_work_max_threads;
     UIntGauge* light_work_max_threads;
 
+    UIntGauge* flush_thread_pool_queue_size;
+    UIntGauge* flush_thread_pool_thread_num;
+
+    UIntGauge* local_scan_thread_pool_queue_size;
+    UIntGauge* local_scan_thread_pool_thread_num;
+    UIntGauge* remote_scan_thread_pool_queue_size;
+    UIntGauge* remote_scan_thread_pool_thread_num;
+    UIntGauge* limited_scan_thread_pool_queue_size;
+    UIntGauge* limited_scan_thread_pool_thread_num;
+    UIntGauge* group_local_scan_thread_pool_queue_size;
+    UIntGauge* group_local_scan_thread_pool_thread_num;
+
     static DorisMetrics* instance() {
         static DorisMetrics instance;
         return &instance;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 62fd5996659..e78e8dceffe 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -17,11 +17,11 @@
 
 #include "scanner_scheduler.h"
 
-#include <stdint.h>
-
 #include <algorithm>
+#include <cstdint>
 #include <functional>
 #include <list>
+#include <memory>
 #include <ostream>
 #include <string>
 #include <typeinfo>
@@ -40,6 +40,7 @@
 #include "util/blocking_queue.hpp"
 #include "util/cpu_info.h"
 #include "util/defer_op.h"
+#include "util/doris_metrics.h"
 #include "util/runtime_profile.h"
 #include "util/thread.h"
 #include "util/threadpool.h"
@@ -53,6 +54,15 @@
 
 namespace doris::vectorized {
 
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_thread_num, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_thread_num, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_thread_num, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(group_local_scan_thread_pool_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(group_local_scan_thread_pool_thread_num, 
MetricUnit::NOUNIT);
+
 ScannerScheduler::ScannerScheduler() = default;
 
 ScannerScheduler::~ScannerScheduler() {
@@ -64,6 +74,7 @@ ScannerScheduler::~ScannerScheduler() {
         delete _pending_queues[i];
     }
     delete[] _pending_queues;
+    _deregister_metrics();
 }
 
 void ScannerScheduler::stop() {
@@ -107,9 +118,9 @@ Status ScannerScheduler::init(ExecEnv* env) {
     }
 
     // 2. local scan thread pool
-    _local_scan_thread_pool.reset(
-            new 
PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
-                                   
config::doris_scanner_thread_pool_queue_size, "local_scan"));
+    _local_scan_thread_pool = std::make_unique<PriorityThreadPool>(
+            config::doris_scanner_thread_pool_thread_num,
+            config::doris_scanner_thread_pool_queue_size, "local_scan");
 
     // 3. remote scan thread pool
     static_cast<void>(
@@ -141,7 +152,7 @@ Status ScannerScheduler::init(ExecEnv* env) {
             this->_task_group_scanner_scan(this, 
_task_group_local_scan_queue.get());
         }));
     }
-
+    _register_metrics();
     _is_init = true;
     return Status::OK();
 }
@@ -179,7 +190,7 @@ void ScannerScheduler::_schedule_thread(int queue_id) {
 }
 
 [[maybe_unused]] static void* run_scanner_bthread(void* arg) {
-    auto f = reinterpret_cast<std::function<void()>*>(arg);
+    auto* f = reinterpret_cast<std::function<void()>*>(arg);
     (*f)();
     delete f;
     return nullptr;
@@ -463,4 +474,34 @@ void 
ScannerScheduler::_task_group_scanner_scan(ScannerScheduler* scheduler,
     }
 }
 
+void ScannerScheduler::_register_metrics() {
+    REGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size,
+                         [this]() { return 
_local_scan_thread_pool->get_queue_size(); });
+    REGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num,
+                         [this]() { return 
_local_scan_thread_pool->get_active_threads(); });
+    REGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size,
+                         [this]() { return 
_remote_scan_thread_pool->get_queue_size(); });
+    REGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num,
+                         [this]() { return 
_remote_scan_thread_pool->num_threads(); });
+    REGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size,
+                         [this]() { return 
_limited_scan_thread_pool->get_queue_size(); });
+    REGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num,
+                         [this]() { return 
_limited_scan_thread_pool->num_threads(); });
+    REGISTER_HOOK_METRIC(group_local_scan_thread_pool_queue_size,
+                         [this]() { return 
_group_local_scan_thread_pool->get_queue_size(); })
+    REGISTER_HOOK_METRIC(group_local_scan_thread_pool_thread_num,
+                         [this]() { return 
_group_local_scan_thread_pool->num_threads(); });
+}
+
+void ScannerScheduler::_deregister_metrics() {
+    DEREGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size);
+    DEREGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num);
+    DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size);
+    DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num);
+    DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size);
+    DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num);
+    DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_queue_size);
+    DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_thread_num);
+}
+
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index 25f79e89aa2..a6d450bc221 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -86,6 +86,9 @@ private:
 
     void _task_group_scanner_scan(ScannerScheduler* scheduler,
                                   taskgroup::ScanTaskTaskGroupQueue* 
scan_queue);
+    void _register_metrics();
+
+    static void _deregister_metrics();
 
     // Scheduling queue number.
     // TODO: make it configurable.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to