This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new d83c928123a [enhancement](metrics) enhance visibility of flush thread
pool (#26544) (#26819)
d83c928123a is described below
commit d83c928123a4fadf2a86d690b88a754dbc57bbe9
Author: Siyang Tang <[email protected]>
AuthorDate: Sun Nov 12 12:15:43 2023 +0800
[enhancement](metrics) enhance visibility of flush thread pool (#26544)
(#26819)
---
be/src/olap/memtable_flush_executor.cpp | 39 ++++++++++++++++++------
be/src/olap/memtable_flush_executor.h | 9 ++++--
be/src/util/doris_metrics.h | 10 +++++++
be/src/vec/exec/scan/scanner_scheduler.cpp | 48 ++++++++++++++++++++++++++----
be/src/vec/exec/scan/scanner_scheduler.h | 5 +++-
5 files changed, 92 insertions(+), 19 deletions(-)
diff --git a/be/src/olap/memtable_flush_executor.cpp
b/be/src/olap/memtable_flush_executor.cpp
index 57c2efb5294..13952697a16 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -18,29 +18,37 @@
#include "olap/memtable_flush_executor.h"
#include <gen_cpp/olap_file.pb.h>
-#include <stddef.h>
#include <algorithm>
+#include <cstddef>
#include <ostream>
#include "common/config.h"
#include "common/logging.h"
#include "olap/memtable.h"
-#include "util/stopwatch.hpp"
-#include "util/time.h"
+#include "olap/rowset/rowset_writer.h"
+#include "util/doris_metrics.h"
+#include "util/metrics.h"
namespace doris {
using namespace ErrorCode;
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_thread_num,
MetricUnit::NOUNIT);
+
+bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num");
+
class MemtableFlushTask final : public Runnable {
public:
MemtableFlushTask(FlushToken* flush_token, std::unique_ptr<MemTable>
memtable,
int64_t submit_task_time)
: _flush_token(flush_token),
_memtable(std::move(memtable)),
- _submit_task_time(submit_task_time) {}
+ _submit_task_time(submit_task_time) {
+ g_flush_task_num << 1;
+ }
- ~MemtableFlushTask() override = default;
+ ~MemtableFlushTask() override { g_flush_task_num << -1; }
void run() override {
_flush_token->_flush_memtable(_memtable.get(), _submit_task_time);
@@ -144,10 +152,11 @@ void MemTableFlushExecutor::init(const
std::vector<DataDir*>& data_dirs) {
min_threads = std::max(1,
config::high_priority_flush_thread_num_per_store);
max_threads = data_dir_num * min_threads;
- ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool")
- .set_min_threads(min_threads)
- .set_max_threads(max_threads)
- .build(&_high_prio_flush_pool);
+ static_cast<void>(ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool")
+ .set_min_threads(min_threads)
+ .set_max_threads(max_threads)
+ .build(&_high_prio_flush_pool));
+ _register_metrics();
}
// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are
flushed in order.
@@ -178,4 +187,16 @@ Status
MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>* fl
return Status::OK();
}
+void MemTableFlushExecutor::_register_metrics() {
+ REGISTER_HOOK_METRIC(flush_thread_pool_queue_size,
+ [this]() { return _flush_pool->get_queue_size(); });
+ REGISTER_HOOK_METRIC(flush_thread_pool_thread_num,
+ [this]() { return _flush_pool->num_threads(); })
+}
+
+void MemTableFlushExecutor::_deregister_metrics() {
+ DEREGISTER_HOOK_METRIC(flush_thread_pool_queue_size);
+ DEREGISTER_HOOK_METRIC(flush_thread_pool_thread_num);
+}
+
} // namespace doris
diff --git a/be/src/olap/memtable_flush_executor.h
b/be/src/olap/memtable_flush_executor.h
index 181b63de729..4c8a654c08c 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -17,9 +17,8 @@
#pragma once
-#include <stdint.h>
-
#include <atomic>
+#include <cstdint>
#include <iosfwd>
#include <memory>
#include <utility>
@@ -97,8 +96,9 @@ private:
// ...
class MemTableFlushExecutor {
public:
- MemTableFlushExecutor() {}
+ MemTableFlushExecutor() = default;
~MemTableFlushExecutor() {
+ _deregister_metrics();
_flush_pool->shutdown();
_high_prio_flush_pool->shutdown();
}
@@ -111,6 +111,9 @@ public:
bool should_serial, bool is_high_priority);
private:
+ void _register_metrics();
+ static void _deregister_metrics();
+
std::unique_ptr<ThreadPool> _flush_pool;
std::unique_ptr<ThreadPool> _high_prio_flush_pool;
};
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 646a4449c0a..023cf75f5b4 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -227,6 +227,16 @@ public:
UIntGauge* heavy_work_max_threads;
UIntGauge* light_work_max_threads;
+ UIntGauge* flush_thread_pool_queue_size;
+ UIntGauge* flush_thread_pool_thread_num;
+
+ UIntGauge* local_scan_thread_pool_queue_size;
+ UIntGauge* local_scan_thread_pool_thread_num;
+ UIntGauge* remote_scan_thread_pool_queue_size;
+ UIntGauge* remote_scan_thread_pool_thread_num;
+ UIntGauge* limited_scan_thread_pool_queue_size;
+ UIntGauge* limited_scan_thread_pool_thread_num;
+
static DorisMetrics* instance() {
static DorisMetrics instance;
return &instance;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 504c46c4016..f63b04692fd 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -17,11 +17,11 @@
#include "scanner_scheduler.h"
-#include <stdint.h>
-
#include <algorithm>
+#include <cstdint>
#include <functional>
#include <list>
+#include <memory>
#include <ostream>
#include <string>
#include <typeinfo>
@@ -40,6 +40,7 @@
#include "util/blocking_queue.hpp"
#include "util/cpu_info.h"
#include "util/defer_op.h"
+#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
#include "util/thread.h"
#include "util/threadpool.h"
@@ -53,6 +54,13 @@
namespace doris::vectorized {
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_thread_num,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_thread_num,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_thread_num,
MetricUnit::NOUNIT);
+
ScannerScheduler::ScannerScheduler() = default;
ScannerScheduler::~ScannerScheduler() {
@@ -66,6 +74,8 @@ ScannerScheduler::~ScannerScheduler() {
_is_closed = true;
+ _deregister_metrics();
+
_scheduler_pool->shutdown();
_local_scan_thread_pool->shutdown();
_remote_scan_thread_pool->shutdown();
@@ -94,9 +104,9 @@ Status ScannerScheduler::init(ExecEnv* env) {
}
// 2. local scan thread pool
- _local_scan_thread_pool.reset(
- new
PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
-
config::doris_scanner_thread_pool_queue_size, "local_scan"));
+ _local_scan_thread_pool = std::make_unique<PriorityThreadPool>(
+ config::doris_scanner_thread_pool_thread_num,
+ config::doris_scanner_thread_pool_queue_size, "local_scan");
// 3. remote scan thread pool
_remote_thread_pool_max_size =
config::doris_max_remote_scanner_thread_pool_thread_num != -1
@@ -115,6 +125,8 @@ Status ScannerScheduler::init(ExecEnv* env) {
.set_max_queue_size(config::doris_scanner_thread_pool_queue_size)
.build(&_limited_scan_thread_pool);
+ _register_metrics();
+
_is_init = true;
return Status::OK();
}
@@ -152,7 +164,7 @@ void ScannerScheduler::_schedule_thread(int queue_id) {
}
[[maybe_unused]] static void* run_scanner_bthread(void* arg) {
- auto f = reinterpret_cast<std::function<void()>*>(arg);
+ auto* f = reinterpret_cast<std::function<void()>*>(arg);
(*f)();
delete f;
return nullptr;
@@ -402,4 +414,28 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler, ScannerContext
ctx->push_back_scanner_and_reschedule(scanner);
}
+void ScannerScheduler::_register_metrics() {
+ REGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size,
+ [this]() { return
_local_scan_thread_pool->get_queue_size(); });
+ REGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num,
+ [this]() { return
_local_scan_thread_pool->get_active_threads(); });
+ REGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size,
+ [this]() { return
_remote_scan_thread_pool->get_queue_size(); });
+ REGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num,
+ [this]() { return
_remote_scan_thread_pool->num_threads(); });
+ REGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size,
+ [this]() { return
_limited_scan_thread_pool->get_queue_size(); });
+ REGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num,
+ [this]() { return
_limited_scan_thread_pool->num_threads(); });
+}
+
+void ScannerScheduler::_deregister_metrics() {
+ DEREGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size);
+ DEREGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num);
+ DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size);
+ DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num);
+ DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size);
+ DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num);
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h
b/be/src/vec/exec/scan/scanner_scheduler.h
index 81dcf5f8b76..df81fcf8b47 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -77,7 +77,10 @@ private:
// execution thread function
void _scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx,
VScannerSPtr scanner);
-private:
+ void _register_metrics();
+
+ static void _deregister_metrics();
+
// Scheduling queue number.
// TODO: make it configurable.
static const int QUEUE_NUM = 4;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]