This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 196fadc044f [enhancement](metrics) enhance visibility of flush thread
pool (#26544)
196fadc044f is described below
commit 196fadc044fa368a8581f6d705a0281eef58e991
Author: Siyang Tang <[email protected]>
AuthorDate: Sat Nov 11 19:53:24 2023 +0800
[enhancement](metrics) enhance visibility of flush thread pool (#26544)
---
be/src/olap/memtable_flush_executor.cpp | 52 ++++++++++++++++++++--------
be/src/olap/memtable_flush_executor.h | 9 +++--
be/src/util/doris_metrics.h | 12 +++++++
be/src/vec/exec/scan/scanner_scheduler.cpp | 55 ++++++++++++++++++++++++++----
be/src/vec/exec/scan/scanner_scheduler.h | 3 ++
5 files changed, 106 insertions(+), 25 deletions(-)
diff --git a/be/src/olap/memtable_flush_executor.cpp
b/be/src/olap/memtable_flush_executor.cpp
index c5d74a6de5e..ac114291a0b 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -18,9 +18,9 @@
#include "olap/memtable_flush_executor.h"
#include <gen_cpp/olap_file.pb.h>
-#include <stddef.h>
#include <algorithm>
+#include <cstddef>
#include <ostream>
#include "common/config.h"
@@ -29,12 +29,18 @@
#include "olap/memtable.h"
#include "olap/rowset/rowset_writer.h"
#include "util/doris_metrics.h"
+#include "util/metrics.h"
#include "util/stopwatch.hpp"
#include "util/time.h"
namespace doris {
using namespace ErrorCode;
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_thread_num,
MetricUnit::NOUNIT);
+
+bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num");
+
class MemtableFlushTask final : public Runnable {
public:
MemtableFlushTask(FlushToken* flush_token, std::unique_ptr<MemTable>
memtable,
@@ -42,9 +48,11 @@ public:
: _flush_token(flush_token),
_memtable(std::move(memtable)),
_segment_id(segment_id),
- _submit_task_time(submit_task_time) {}
+ _submit_task_time(submit_task_time) {
+ g_flush_task_num << 1;
+ }
- ~MemtableFlushTask() override = default;
+ ~MemtableFlushTask() override { g_flush_task_num << -1; }
void run() override {
_flush_token->_flush_memtable(_memtable.get(), _segment_id,
_submit_task_time);
@@ -122,7 +130,8 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable,
int32_t segment_id, in
return Status::OK();
}
-void FlushToken::_flush_memtable(MemTable* memtable, int32_t segment_id,
int64_t submit_task_time) {
+void FlushToken::_flush_memtable(MemTable* mem_table, int32_t segment_id,
+ int64_t submit_task_time) {
uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time;
_stats.flush_wait_time_ns += flush_wait_time_ns;
// If previous flush has failed, return directly
@@ -135,10 +144,10 @@ void FlushToken::_flush_memtable(MemTable* memtable,
int32_t segment_id, int64_t
MonotonicStopWatch timer;
timer.start();
- size_t memory_usage = memtable->memory_usage();
+ size_t memory_usage = mem_table->memory_usage();
int64_t flush_size;
- Status s = _do_flush_memtable(memtable, segment_id, &flush_size);
+ Status s = _do_flush_memtable(mem_table, segment_id, &flush_size);
{
std::shared_lock rdlk(_flush_status_lock);
@@ -161,7 +170,7 @@ void FlushToken::_flush_memtable(MemTable* memtable,
int32_t segment_id, int64_t
_stats.flush_time_ns += timer.elapsed_time();
_stats.flush_finish_count++;
_stats.flush_running_count--;
- _stats.flush_size_bytes += memtable->memory_usage();
+ _stats.flush_size_bytes += mem_table->memory_usage();
_stats.flush_disk_size_bytes += flush_size;
}
@@ -180,6 +189,7 @@ void MemTableFlushExecutor::init(const
std::vector<DataDir*>& data_dirs) {
.set_min_threads(min_threads)
.set_max_threads(max_threads)
.build(&_high_prio_flush_pool));
+ _register_metrics();
}
// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are
flushed in order.
@@ -189,26 +199,38 @@ Status
MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& fl
if (!is_high_priority) {
if (rowset_writer->type() == BETA_ROWSET && !should_serial) {
// beta rowset can be flush in CONCURRENT, because each memtable
using a new segment writer.
- flush_token.reset(
- new
FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT)));
+ flush_token = std::make_unique<FlushToken>(
+
_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT));
} else {
// alpha rowset do not support flush in CONCURRENT.
- flush_token.reset(
- new
FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)));
+ flush_token = std::make_unique<FlushToken>(
+ _flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL));
}
} else {
if (rowset_writer->type() == BETA_ROWSET && !should_serial) {
// beta rowset can be flush in CONCURRENT, because each memtable
using a new segment writer.
- flush_token.reset(new FlushToken(
-
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT)));
+ flush_token = std::make_unique<FlushToken>(
+
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT));
} else {
// alpha rowset do not support flush in CONCURRENT.
- flush_token.reset(new FlushToken(
-
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)));
+ flush_token = std::make_unique<FlushToken>(
+
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL));
}
}
flush_token->set_rowset_writer(rowset_writer);
return Status::OK();
}
+void MemTableFlushExecutor::_register_metrics() {
+ REGISTER_HOOK_METRIC(flush_thread_pool_queue_size,
+ [this]() { return _flush_pool->get_queue_size(); });
+ REGISTER_HOOK_METRIC(flush_thread_pool_thread_num,
+ [this]() { return _flush_pool->num_threads(); })
+}
+
+void MemTableFlushExecutor::_deregister_metrics() {
+ DEREGISTER_HOOK_METRIC(flush_thread_pool_queue_size);
+ DEREGISTER_HOOK_METRIC(flush_thread_pool_thread_num);
+}
+
} // namespace doris
diff --git a/be/src/olap/memtable_flush_executor.h
b/be/src/olap/memtable_flush_executor.h
index ee7194349f6..d2039ce8127 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -17,9 +17,8 @@
#pragma once
-#include <stdint.h>
-
#include <atomic>
+#include <cstdint>
#include <iosfwd>
#include <memory>
#include <utility>
@@ -108,8 +107,9 @@ private:
// ...
class MemTableFlushExecutor {
public:
- MemTableFlushExecutor() {}
+ MemTableFlushExecutor() = default;
~MemTableFlushExecutor() {
+ _deregister_metrics();
_flush_pool->shutdown();
_high_prio_flush_pool->shutdown();
}
@@ -122,6 +122,9 @@ public:
bool should_serial, bool is_high_priority);
private:
+ void _register_metrics();
+ static void _deregister_metrics();
+
std::unique_ptr<ThreadPool> _flush_pool;
std::unique_ptr<ThreadPool> _high_prio_flush_pool;
};
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 60ca3b51072..409e6dae2b6 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -228,6 +228,18 @@ public:
UIntGauge* heavy_work_max_threads;
UIntGauge* light_work_max_threads;
+ UIntGauge* flush_thread_pool_queue_size;
+ UIntGauge* flush_thread_pool_thread_num;
+
+ UIntGauge* local_scan_thread_pool_queue_size;
+ UIntGauge* local_scan_thread_pool_thread_num;
+ UIntGauge* remote_scan_thread_pool_queue_size;
+ UIntGauge* remote_scan_thread_pool_thread_num;
+ UIntGauge* limited_scan_thread_pool_queue_size;
+ UIntGauge* limited_scan_thread_pool_thread_num;
+ UIntGauge* group_local_scan_thread_pool_queue_size;
+ UIntGauge* group_local_scan_thread_pool_thread_num;
+
static DorisMetrics* instance() {
static DorisMetrics instance;
return &instance;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 62fd5996659..e78e8dceffe 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -17,11 +17,11 @@
#include "scanner_scheduler.h"
-#include <stdint.h>
-
#include <algorithm>
+#include <cstdint>
#include <functional>
#include <list>
+#include <memory>
#include <ostream>
#include <string>
#include <typeinfo>
@@ -40,6 +40,7 @@
#include "util/blocking_queue.hpp"
#include "util/cpu_info.h"
#include "util/defer_op.h"
+#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
#include "util/thread.h"
#include "util/threadpool.h"
@@ -53,6 +54,15 @@
namespace doris::vectorized {
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_thread_num,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_thread_num,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_thread_num,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(group_local_scan_thread_pool_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(group_local_scan_thread_pool_thread_num,
MetricUnit::NOUNIT);
+
ScannerScheduler::ScannerScheduler() = default;
ScannerScheduler::~ScannerScheduler() {
@@ -64,6 +74,7 @@ ScannerScheduler::~ScannerScheduler() {
delete _pending_queues[i];
}
delete[] _pending_queues;
+ _deregister_metrics();
}
void ScannerScheduler::stop() {
@@ -107,9 +118,9 @@ Status ScannerScheduler::init(ExecEnv* env) {
}
// 2. local scan thread pool
- _local_scan_thread_pool.reset(
- new
PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
-
config::doris_scanner_thread_pool_queue_size, "local_scan"));
+ _local_scan_thread_pool = std::make_unique<PriorityThreadPool>(
+ config::doris_scanner_thread_pool_thread_num,
+ config::doris_scanner_thread_pool_queue_size, "local_scan");
// 3. remote scan thread pool
static_cast<void>(
@@ -141,7 +152,7 @@ Status ScannerScheduler::init(ExecEnv* env) {
this->_task_group_scanner_scan(this,
_task_group_local_scan_queue.get());
}));
}
-
+ _register_metrics();
_is_init = true;
return Status::OK();
}
@@ -179,7 +190,7 @@ void ScannerScheduler::_schedule_thread(int queue_id) {
}
[[maybe_unused]] static void* run_scanner_bthread(void* arg) {
- auto f = reinterpret_cast<std::function<void()>*>(arg);
+ auto* f = reinterpret_cast<std::function<void()>*>(arg);
(*f)();
delete f;
return nullptr;
@@ -463,4 +474,34 @@ void
ScannerScheduler::_task_group_scanner_scan(ScannerScheduler* scheduler,
}
}
+void ScannerScheduler::_register_metrics() {
+ REGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size,
+ [this]() { return
_local_scan_thread_pool->get_queue_size(); });
+ REGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num,
+ [this]() { return
_local_scan_thread_pool->get_active_threads(); });
+ REGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size,
+ [this]() { return
_remote_scan_thread_pool->get_queue_size(); });
+ REGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num,
+ [this]() { return
_remote_scan_thread_pool->num_threads(); });
+ REGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size,
+ [this]() { return
_limited_scan_thread_pool->get_queue_size(); });
+ REGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num,
+ [this]() { return
_limited_scan_thread_pool->num_threads(); });
+ REGISTER_HOOK_METRIC(group_local_scan_thread_pool_queue_size,
+ [this]() { return
_group_local_scan_thread_pool->get_queue_size(); })
+ REGISTER_HOOK_METRIC(group_local_scan_thread_pool_thread_num,
+ [this]() { return
_group_local_scan_thread_pool->num_threads(); });
+}
+
+void ScannerScheduler::_deregister_metrics() {
+ DEREGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size);
+ DEREGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num);
+ DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size);
+ DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num);
+ DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size);
+ DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num);
+ DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_queue_size);
+ DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_thread_num);
+}
+
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h
b/be/src/vec/exec/scan/scanner_scheduler.h
index 25f79e89aa2..a6d450bc221 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -86,6 +86,9 @@ private:
void _task_group_scanner_scan(ScannerScheduler* scheduler,
taskgroup::ScanTaskTaskGroupQueue*
scan_queue);
+ void _register_metrics();
+
+ static void _deregister_metrics();
// Scheduling queue number.
// TODO: make it configurable.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]