This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 673f4628eb6 [feat](metrics) Unify metrics of thread pool (#43144)
673f4628eb6 is described below
commit 673f4628eb645558c717fc2ea620e469ab96ecbb
Author: zhiqiang <[email protected]>
AuthorDate: Wed Jan 1 16:06:59 2025 +0800
[feat](metrics) Unify metrics of thread pool (#43144)
### What problem does this PR solve?
Add metrics for all thread pool, more specifically, for all ThreadPool
objects.
All thread pool will have following metrics:
1. thread_pool_active_threads
2. thread_pool_queue_size
3. thread_pool_max_queue_size
4. thread_pool_max_threads
5. task_execution_time_ns_avg_in_last_1000_times
6. task_wait_worker_ns_avg_in_last_1000_times
A new class `IntervalHistogramStat` is created for interval histogram
calculation.
Metrics is updated by `hook` method when they are needed by prometheus.
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/olap/memtable_flush_executor.cpp | 16 -----
be/src/olap/memtable_flush_executor.h | 4 --
be/src/runtime/exec_env.h | 3 +
be/src/runtime/exec_env_init.cpp | 42 +-----------
be/src/runtime/fragment_mgr.cpp | 10 +--
be/src/runtime/workload_group/workload_group.cpp | 4 +-
be/src/util/doris_metrics.cpp | 6 --
be/src/util/doris_metrics.h | 22 ------
be/src/util/interval_histogram.cpp | 80 ++++++++++++++++++++++
be/src/util/interval_histogram.h | 46 +++++++++++++
be/src/util/metrics.cpp | 5 +-
be/src/util/threadpool.cpp | 69 +++++++++++++++++--
be/src/util/threadpool.h | 30 +++++++-
be/src/vec/exec/scan/scanner_scheduler.cpp | 54 +--------------
be/src/vec/exec/scan/scanner_scheduler.h | 19 +++--
be/test/io/fs/buffered_reader_test.cpp | 12 ++--
be/test/io/fs/remote_file_system_test.cpp | 9 +--
be/test/io/fs/s3_file_writer_test.cpp | 1 +
be/test/olap/rowset/beta_rowset_test.cpp | 2 +
.../rowset/unique_rowset_id_generator_test.cpp | 1 +
be/test/testutil/run_all_tests.cpp | 12 ++++
be/test/util/countdown_latch_test.cpp | 1 +
be/test/util/interval_histogram_test.cpp | 78 +++++++++++++++++++++
.../agg_linear_histogram_test.cpp | 4 +-
.../workload_manager_p0/test_curd_wlg.groovy | 56 +++++++++++++++
25 files changed, 402 insertions(+), 184 deletions(-)
diff --git a/be/src/olap/memtable_flush_executor.cpp
b/be/src/olap/memtable_flush_executor.cpp
index 5cdb45281b9..0181cc1d64d 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -37,9 +37,6 @@
namespace doris {
using namespace ErrorCode;
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_queue_size,
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_thread_num,
MetricUnit::NOUNIT);
-
bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num");
class MemtableFlushTask final : public Runnable {
@@ -239,7 +236,6 @@ void MemTableFlushExecutor::init(int num_disk) {
.set_min_threads(min_threads)
.set_max_threads(max_threads)
.build(&_high_prio_flush_pool));
- _register_metrics();
}
// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are
flushed in order.
@@ -263,16 +259,4 @@ Status
MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& fl
}
}
-void MemTableFlushExecutor::_register_metrics() {
- REGISTER_HOOK_METRIC(flush_thread_pool_queue_size,
- [this]() { return _flush_pool->get_queue_size(); });
- REGISTER_HOOK_METRIC(flush_thread_pool_thread_num,
- [this]() { return _flush_pool->num_threads(); })
-}
-
-void MemTableFlushExecutor::_deregister_metrics() {
- DEREGISTER_HOOK_METRIC(flush_thread_pool_queue_size);
- DEREGISTER_HOOK_METRIC(flush_thread_pool_thread_num);
-}
-
} // namespace doris
diff --git a/be/src/olap/memtable_flush_executor.h
b/be/src/olap/memtable_flush_executor.h
index 27e8e8a9b0e..753f1106646 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -127,7 +127,6 @@ class MemTableFlushExecutor {
public:
MemTableFlushExecutor() = default;
~MemTableFlushExecutor() {
- _deregister_metrics();
_flush_pool->shutdown();
_high_prio_flush_pool->shutdown();
}
@@ -141,9 +140,6 @@ public:
std::shared_ptr<WorkloadGroup> wg_sptr);
private:
- void _register_metrics();
- static void _deregister_metrics();
-
std::unique_ptr<ThreadPool> _flush_pool;
std::unique_ptr<ThreadPool> _high_prio_flush_pool;
};
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 0c9a4158ebc..12d625da3bf 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -300,6 +300,9 @@ public:
_s_tracking_memory.store(tracking_memory, std::memory_order_release);
}
void set_orc_memory_pool(orc::MemoryPool* pool) { _orc_memory_pool = pool;
}
+ void set_non_block_close_thread_pool(std::unique_ptr<ThreadPool>&& pool) {
+ _non_block_close_thread_pool = std::move(pool);
+ }
#endif
LoadStreamMapPool* load_stream_map_pool() { return
_load_stream_map_pool.get(); }
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index df66315ff05..f0d8253254f 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -118,31 +118,10 @@
#include "runtime/memory/tcmalloc_hook.h"
#endif
-// Used for unit test
-namespace {
-std::once_flag flag;
-std::unique_ptr<doris::ThreadPool> non_block_close_thread_pool;
-void init_threadpool_for_test() {
- static_cast<void>(doris::ThreadPoolBuilder("NonBlockCloseThreadPool")
- .set_min_threads(12)
- .set_max_threads(48)
- .build(&non_block_close_thread_pool));
-}
-
-[[maybe_unused]] doris::ThreadPool* get_non_block_close_thread_pool() {
- std::call_once(flag, init_threadpool_for_test);
- return non_block_close_thread_pool.get();
-}
-} // namespace
-
namespace doris {
class PBackendService_Stub;
class PFunctionService_Stub;
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scanner_thread_pool_queue_size,
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_thread_num,
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size,
MetricUnit::NOUNIT);
-
static void init_doris_metrics(const std::vector<StorePath>& store_paths) {
bool init_system_metrics = config::enable_system_metrics;
std::set<std::string> disk_devices;
@@ -178,11 +157,7 @@ static pair<size_t, size_t> get_num_threads(size_t
min_num, size_t max_num) {
}
ThreadPool* ExecEnv::non_block_close_thread_pool() {
-#ifdef BE_TEST
- return get_non_block_close_thread_pool();
-#else
return _non_block_close_thread_pool.get();
-#endif
}
ExecEnv::ExecEnv() = default;
@@ -342,7 +317,6 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit()));
RETURN_IF_ERROR(_wal_manager->init());
_heartbeat_flags = new HeartbeatFlags();
- _register_metrics();
_tablet_schema_cache =
TabletSchemaCache::create_global_schema_cache(config::tablet_schema_cache_capacity);
@@ -672,19 +646,6 @@ Status ExecEnv::_check_deploy_mode() {
return Status::OK();
}
-void ExecEnv::_register_metrics() {
- REGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num,
- [this]() { return
_send_batch_thread_pool->num_threads(); });
-
- REGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size,
- [this]() { return
_send_batch_thread_pool->get_queue_size(); });
-}
-
-void ExecEnv::_deregister_metrics() {
- DEREGISTER_HOOK_METRIC(scanner_thread_pool_queue_size);
- DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num);
- DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size);
-}
#ifdef BE_TEST
void ExecEnv::set_new_load_stream_mgr(std::unique_ptr<NewLoadStreamMgr>&&
new_load_stream_mgr) {
this->_new_load_stream_mgr = std::move(new_load_stream_mgr);
@@ -740,6 +701,7 @@ void ExecEnv::destroy() {
SAFE_STOP(_fragment_mgr);
SAFE_STOP(_runtime_filter_timer_queue);
// NewLoadStreamMgr should be destoried before storage_engine & after
fragment_mgr stopped.
+ _load_stream_mgr.reset();
_new_load_stream_mgr.reset();
_stream_load_executor.reset();
_memtable_memory_limiter.reset();
@@ -762,8 +724,8 @@ void ExecEnv::destroy() {
SAFE_SHUTDOWN(_non_block_close_thread_pool);
SAFE_SHUTDOWN(_s3_file_system_thread_pool);
SAFE_SHUTDOWN(_send_batch_thread_pool);
+ SAFE_SHUTDOWN(_send_table_stats_thread_pool);
- _deregister_metrics();
SAFE_DELETE(_load_channel_mgr);
SAFE_DELETE(_spill_stream_mgr);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 19e8f76366c..60b7856d6aa 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -97,8 +97,7 @@ namespace doris {
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_instance_count,
MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count,
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size,
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_num_active_threads,
MetricUnit::NOUNIT);
+
bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr",
"prepare");
bvar::Adder<uint64_t> g_fragment_executing_count("fragment_executing_count");
@@ -243,11 +242,6 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env)
.set_max_threads(config::fragment_mgr_asynic_work_pool_thread_num_max)
.set_max_queue_size(config::fragment_mgr_asynic_work_pool_queue_size)
.build(&_thread_pool);
-
- REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size,
- [this]() { return _thread_pool->get_queue_size(); });
- REGISTER_HOOK_METRIC(fragment_thread_pool_num_active_threads,
- [this]() { return _thread_pool->num_active_threads();
});
CHECK(s.ok()) << s.to_string();
}
@@ -255,8 +249,6 @@ FragmentMgr::~FragmentMgr() = default;
void FragmentMgr::stop() {
DEREGISTER_HOOK_METRIC(fragment_instance_count);
- DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size);
- DEREGISTER_HOOK_METRIC(fragment_thread_pool_num_active_threads);
_stop_background_threads_latch.count_down();
if (_cancel_thread) {
_cancel_thread->join();
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index 6b9388af30a..3ceeed2de19 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -487,7 +487,7 @@ void
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
if (_scan_task_sched == nullptr) {
std::unique_ptr<vectorized::SimplifiedScanScheduler> scan_scheduler =
std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_"
+ wg_name,
-
cg_cpu_ctl_ptr);
+
cg_cpu_ctl_ptr, wg_name);
Status ret =
scan_scheduler->start(config::doris_scanner_thread_pool_thread_num,
config::doris_scanner_thread_pool_thread_num,
config::doris_scanner_thread_pool_queue_size);
@@ -507,7 +507,7 @@ void
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
vectorized::ScannerScheduler::get_remote_scan_thread_queue_size();
std::unique_ptr<vectorized::SimplifiedScanScheduler>
remote_scan_scheduler =
std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_"
+ wg_name,
-
cg_cpu_ctl_ptr);
+
cg_cpu_ctl_ptr, wg_name);
Status ret = remote_scan_scheduler->start(remote_max_thread_num,
config::doris_scanner_min_thread_pool_thread_num,
remote_scan_thread_queue_size);
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index e77ee1c36b6..39f246d98d3 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -191,9 +191,6 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt,
MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_cnt, MetricUnit::NOUNIT);
-DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_queued, MetricUnit::NOUNIT);
-DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_running, MetricUnit::NOUNIT);
-DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_submit_failed,
MetricUnit::NOUNIT);
const std::string DorisMetrics::_s_registry_name = "doris_be";
const std::string DorisMetrics::_s_hook_name = "doris_metrics";
@@ -319,9 +316,6 @@ DorisMetrics::DorisMetrics() :
_metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_cnt);
- INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_queued);
- INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_running);
- INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
scanner_task_submit_failed);
}
void DorisMetrics::initialize(bool init_system_metrics, const
std::set<std::string>& disk_devices,
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index d089758c21c..26bad02fdd2 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -197,13 +197,6 @@ public:
UIntGauge* query_cache_sql_total_count = nullptr;
UIntGauge* query_cache_partition_total_count = nullptr;
- UIntGauge* scanner_thread_pool_queue_size = nullptr;
- UIntGauge* add_batch_task_queue_size = nullptr;
- UIntGauge* send_batch_thread_pool_thread_num = nullptr;
- UIntGauge* send_batch_thread_pool_queue_size = nullptr;
- UIntGauge* fragment_thread_pool_queue_size = nullptr;
- UIntGauge* fragment_thread_pool_num_active_threads = nullptr;
-
// Upload metrics
UIntGauge* upload_total_byte = nullptr;
IntCounter* upload_rowset_count = nullptr;
@@ -224,18 +217,6 @@ public:
UIntGauge* arrow_flight_work_pool_max_queue_size = nullptr;
UIntGauge* arrow_flight_work_max_threads = nullptr;
- UIntGauge* flush_thread_pool_queue_size = nullptr;
- UIntGauge* flush_thread_pool_thread_num = nullptr;
-
- UIntGauge* local_scan_thread_pool_queue_size = nullptr;
- UIntGauge* local_scan_thread_pool_thread_num = nullptr;
- UIntGauge* remote_scan_thread_pool_queue_size = nullptr;
- UIntGauge* remote_scan_thread_pool_thread_num = nullptr;
- UIntGauge* limited_scan_thread_pool_queue_size = nullptr;
- UIntGauge* limited_scan_thread_pool_thread_num = nullptr;
- UIntGauge* group_local_scan_thread_pool_queue_size = nullptr;
- UIntGauge* group_local_scan_thread_pool_thread_num = nullptr;
-
IntCounter* num_io_bytes_read_total = nullptr;
IntCounter* num_io_bytes_read_from_cache = nullptr;
IntCounter* num_io_bytes_read_from_remote = nullptr;
@@ -244,9 +225,6 @@ public:
IntCounter* scanner_ctx_cnt = nullptr;
IntCounter* scanner_cnt = nullptr;
IntCounter* scanner_task_cnt = nullptr;
- IntCounter* scanner_task_queued = nullptr;
- IntCounter* scanner_task_submit_failed = nullptr;
- IntCounter* scanner_task_running = nullptr;
static DorisMetrics* instance() {
static DorisMetrics instance;
diff --git a/be/src/util/interval_histogram.cpp
b/be/src/util/interval_histogram.cpp
new file mode 100644
index 00000000000..ec894fc69fa
--- /dev/null
+++ b/be/src/util/interval_histogram.cpp
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/interval_histogram.h"
+
+#include <algorithm>
+#include <mutex>
+#include <numeric>
+#include <vector>
+
+#include "gutil/integral_types.h"
+
+namespace doris {
+
+template <typename T>
+IntervalHistogramStat<T>::IntervalHistogramStat(size_t N) : window(N) {}
+
+template <typename T>
+void IntervalHistogramStat<T>::add(T value) {
+ std::unique_lock<std::shared_mutex> lock(mutex);
+ if (window.full()) {
+ window.pop_front();
+ }
+ window.push_back(value);
+}
+
+template <typename T>
+T IntervalHistogramStat<T>::mean() {
+ std::shared_lock<std::shared_mutex> lock(mutex);
+ if (window.empty()) {
+ return T();
+ }
+ T sum = std::accumulate(window.begin(), window.end(), T());
+ return sum / window.size();
+}
+
+template <typename T>
+T IntervalHistogramStat<T>::median() {
+ std::shared_lock<std::shared_mutex> lock(mutex);
+ if (window.empty()) {
+ return T();
+ }
+
+ std::vector<T> sorted(window.begin(), window.end());
+ std::sort(sorted.begin(), sorted.end());
+
+ size_t mid = sorted.size() / 2;
+ return sorted.size() % 2 == 0 ? (sorted[mid - 1] + sorted[mid]) / 2 :
sorted[mid];
+}
+
+template <typename T>
+T IntervalHistogramStat<T>::max() {
+ std::shared_lock<std::shared_mutex> lock(mutex);
+ return *std::max_element(window.begin(), window.end());
+}
+
+template <typename T>
+T IntervalHistogramStat<T>::min() {
+ std::shared_lock<std::shared_mutex> lock(mutex);
+ return *std::min_element(window.begin(), window.end());
+}
+
+template class doris::IntervalHistogramStat<int64>;
+template class doris::IntervalHistogramStat<int32>;
+
+} // namespace doris
diff --git a/be/src/util/interval_histogram.h b/be/src/util/interval_histogram.h
new file mode 100644
index 00000000000..2d5d9e6c6d2
--- /dev/null
+++ b/be/src/util/interval_histogram.h
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <boost/circular_buffer.hpp>
+#include <shared_mutex>
+
+namespace doris {
+
+// A thread-safe interval histogram stat class.
+// IntervalHistogramStat will keep a FIXED-SIZE window of values and provide
+// statistics like mean, median, max, min.
+
+template <typename T>
+class IntervalHistogramStat {
+public:
+ explicit IntervalHistogramStat(size_t N);
+
+ void add(T value);
+
+ T mean();
+ T median();
+ T max();
+ T min();
+
+private:
+ boost::circular_buffer<T> window;
+ mutable std::shared_mutex mutex;
+};
+
+} // namespace doris
diff --git a/be/src/util/metrics.cpp b/be/src/util/metrics.cpp
index 23dbb628a0d..1a3aa51cd2b 100644
--- a/be/src/util/metrics.cpp
+++ b/be/src/util/metrics.cpp
@@ -169,6 +169,7 @@ std::string HistogramMetric::to_string() const {
std::string HistogramMetric::to_prometheus(const std::string& display_name,
const Labels& entity_labels,
const Labels& metric_labels) const {
+ // TODO: Use std::string concate for better performance.
std::stringstream ss;
for (const auto& percentile : _s_output_percentiles) {
auto quantile_lable = Labels({{"quantile", percentile.first}});
@@ -322,12 +323,12 @@ void MetricRegistry::trigger_all_hooks(bool force) const {
std::string MetricRegistry::to_prometheus(bool with_tablet_metrics) const {
// Reorder by MetricPrototype
EntityMetricsByType entity_metrics_by_types;
- std::lock_guard<std::mutex> l(_lock);
+ std::lock_guard<std::mutex> l1(_lock);
for (const auto& entity : _entities) {
if (entity.first->_type == MetricEntityType::kTablet &&
!with_tablet_metrics) {
continue;
}
- std::lock_guard<std::mutex> l(entity.first->_lock);
+ std::lock_guard<std::mutex> l2(entity.first->_lock);
entity.first->trigger_hook_unlocked(false);
for (const auto& metric : entity.first->_metrics) {
std::pair<MetricEntity*, Metric*> new_elem =
diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp
index e9af13f556e..5548ad7f400 100644
--- a/be/src/util/threadpool.cpp
+++ b/be/src/util/threadpool.cpp
@@ -33,10 +33,23 @@
#include "gutil/port.h"
#include "gutil/strings/substitute.h"
#include "util/debug/sanitizer_scopes.h"
+#include "util/doris_metrics.h"
+#include "util/metrics.h"
#include "util/scoped_cleanup.h"
+#include "util/stopwatch.hpp"
#include "util/thread.h"
namespace doris {
+// The name of these varialbs will be useds as metric name in prometheus.
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_active_threads,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_size, MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_threads,
MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_submit_failed,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(task_execution_time_ns_avg_in_last_1000_times,
+ MetricUnit::NANOSECONDS);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(task_wait_worker_ns_avg_in_last_1000_times,
+ MetricUnit::NANOSECONDS);
using namespace ErrorCode;
using std::string;
@@ -52,8 +65,9 @@ private:
std::function<void()> _func;
};
-ThreadPoolBuilder::ThreadPoolBuilder(string name)
+ThreadPoolBuilder::ThreadPoolBuilder(string name, string workload_group)
: _name(std::move(name)),
+ _workload_group(std::move(workload_group)),
_min_threads(0),
_max_threads(std::thread::hardware_concurrency()),
_max_queue_size(std::numeric_limits<int>::max()),
@@ -238,6 +252,7 @@ bool ThreadPoolToken::need_dispatch() {
ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
: _name(builder._name),
+ _workload_group(builder._workload_group),
_min_threads(builder._min_threads),
_max_threads(builder._max_threads),
_max_queue_size(builder._max_queue_size),
@@ -248,7 +263,8 @@ ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
_active_threads(0),
_total_queued_tasks(0),
_cgroup_cpu_ctl(builder._cgroup_cpu_ctl),
- _tokenless(new_token(ExecutionMode::CONCURRENT)) {}
+ _tokenless(new_token(ExecutionMode::CONCURRENT)),
+ _id(UniqueId::gen_uid()) {}
ThreadPool::~ThreadPool() {
// There should only be one live token: the one used in tokenless
submission.
@@ -270,10 +286,48 @@ Status ThreadPool::init() {
return status;
}
}
+ // _id of thread pool is used to make sure when we create thread pool with
same name, we can
+ // get different _metric_entity
+ // If not, we will have problem when we deregister entity and register
hook.
+ _metric_entity =
DorisMetrics::instance()->metric_registry()->register_entity(
+ fmt::format("thread_pool_{}", _name), {{"thread_pool_name", _name},
+ {"workload_group",
_workload_group},
+ {"id", _id.to_string()}});
+
+ INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_active_threads);
+ INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_threads);
+ INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_size);
+ INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_queue_size);
+ INT_GAUGE_METRIC_REGISTER(_metric_entity,
task_execution_time_ns_avg_in_last_1000_times);
+ INT_GAUGE_METRIC_REGISTER(_metric_entity,
task_wait_worker_ns_avg_in_last_1000_times);
+ INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_submit_failed);
+
+ _metric_entity->register_hook("update", [this]() {
+ {
+ std::lock_guard<std::mutex> l(_lock);
+ if (!_pool_status.ok()) {
+ return;
+ }
+ }
+
+ thread_pool_active_threads->set_value(num_active_threads());
+ thread_pool_queue_size->set_value(get_queue_size());
+ thread_pool_max_queue_size->set_value(get_max_queue_size());
+ thread_pool_max_threads->set_value(max_threads());
+ task_execution_time_ns_avg_in_last_1000_times->set_value(
+ _task_execution_time_ns_statistic.mean());
+ task_wait_worker_ns_avg_in_last_1000_times->set_value(
+ _task_wait_worker_time_ns_statistic.mean());
+ });
return Status::OK();
}
void ThreadPool::shutdown() {
+ // Why access to doris_metrics is safe here?
+ // Since DorisMetrics is a singleton, it will be destroyed only after
doris_main is exited.
+ // The shutdown/destroy of ThreadPool is guaranteed to take place before
doris_main exits by
+ // ExecEnv::destroy().
+
DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity);
debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
std::unique_lock<std::mutex> l(_lock);
check_not_pool_thread_unlocked();
@@ -357,8 +411,6 @@ Status ThreadPool::submit_func(std::function<void()> f) {
Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken*
token) {
DCHECK(token);
- std::chrono::time_point<std::chrono::system_clock> submit_time =
- std::chrono::system_clock::now();
std::unique_lock<std::mutex> l(_lock);
if (PREDICT_FALSE(!_pool_status.ok())) {
@@ -373,6 +425,7 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r,
ThreadPoolToken* token
int64_t capacity_remaining = static_cast<int64_t>(_max_threads) -
_active_threads +
static_cast<int64_t>(_max_queue_size) -
_total_queued_tasks;
if (capacity_remaining < 1) {
+ thread_pool_submit_failed->increment(1);
return Status::Error<SERVICE_UNAVAILABLE>(
"Thread pool {} is at capacity ({}/{} tasks running, {}/{}
tasks queued)", _name,
_num_threads + _num_threads_pending_start, _max_threads,
_total_queued_tasks,
@@ -408,7 +461,7 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r,
ThreadPoolToken* token
Task task;
task.runnable = std::move(r);
- task.submit_time = submit_time;
+ task.submit_time_wather.start();
// Add the task to the token's queue.
ThreadPoolToken::State state = token->state();
@@ -528,12 +581,15 @@ void ThreadPool::dispatch_thread() {
continue;
}
+ MonotonicStopWatch task_execution_time_watch;
+ task_execution_time_watch.start();
// Get the next token and task to execute.
ThreadPoolToken* token = _queue.front();
_queue.pop_front();
DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state());
DCHECK(!token->_entries.empty());
Task task = std::move(token->_entries.front());
+
_task_wait_worker_time_ns_statistic.add(task.submit_time_wather.elapsed_time());
token->_entries.pop_front();
token->_active_threads++;
--_total_queued_tasks;
@@ -543,7 +599,6 @@ void ThreadPool::dispatch_thread() {
// Execute the task
task.runnable->run();
-
// Destruct the task while we do not hold the lock.
//
// The task's destructor may be expensive if it has a lot of bound
@@ -552,7 +607,7 @@ void ThreadPool::dispatch_thread() {
// with this threadpool, and produce a deadlock.
task.runnable.reset();
l.lock();
-
+
_task_execution_time_ns_statistic.add(task_execution_time_watch.elapsed_time());
// Possible states:
// 1. The token was shut down while we ran its task. Transition to
QUIESCED.
// 2. The token has no more queued tasks. Transition back to IDLE.
diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h
index f822c307aa6..8c89f570a0a 100644
--- a/be/src/util/threadpool.h
+++ b/be/src/util/threadpool.h
@@ -20,6 +20,8 @@
#pragma once
+#include <gen_cpp/Types_types.h>
+
#include <boost/intrusive/detail/algo_type.hpp>
#include <boost/intrusive/list.hpp>
#include <boost/intrusive/list_hook.hpp>
@@ -38,6 +40,9 @@
#include "agent/cgroup_cpu_ctl.h"
#include "common/status.h"
+#include "util/interval_histogram.h"
+#include "util/metrics.h"
+#include "util/uid_util.h"
#include "util/work_thread_pool.hpp"
namespace doris {
@@ -99,7 +104,7 @@ public:
//
class ThreadPoolBuilder {
public:
- explicit ThreadPoolBuilder(std::string name);
+ explicit ThreadPoolBuilder(std::string name, std::string workload_group =
"");
// Note: We violate the style guide by returning mutable references here
// in order to provide traditional Builder pattern conveniences.
@@ -132,6 +137,7 @@ public:
private:
friend class ThreadPool;
const std::string _name;
+ const std::string _workload_group;
int _min_threads;
int _max_threads;
int _max_queue_size;
@@ -255,6 +261,11 @@ public:
return _total_queued_tasks;
}
+ int get_max_queue_size() const {
+ std::lock_guard<std::mutex> l(_lock);
+ return _max_queue_size;
+ }
+
std::vector<int> debug_info() const {
std::lock_guard<std::mutex> l(_lock);
std::vector<int> arr = {_num_threads,
static_cast<int>(_threads.size()), _min_threads,
@@ -280,7 +291,7 @@ private:
std::shared_ptr<Runnable> runnable;
// Time at which the entry was submitted to the pool.
- std::chrono::time_point<std::chrono::system_clock> submit_time;
+ MonotonicStopWatch submit_time_wather;
};
// Creates a new thread pool using a builder.
@@ -308,6 +319,7 @@ private:
void release_token(ThreadPoolToken* t);
const std::string _name;
+ const std::string _workload_group;
int _min_threads;
int _max_threads;
const int _max_queue_size;
@@ -392,6 +404,20 @@ private:
// ExecutionMode::CONCURRENT token used by the pool for tokenless
submission.
std::unique_ptr<ThreadPoolToken> _tokenless;
+ const UniqueId _id;
+
+ std::shared_ptr<MetricEntity> _metric_entity;
+ IntGauge* thread_pool_active_threads = nullptr;
+ IntGauge* thread_pool_queue_size = nullptr;
+ IntGauge* thread_pool_max_queue_size = nullptr;
+ IntGauge* thread_pool_max_threads = nullptr;
+ IntGauge* task_execution_time_ns_avg_in_last_1000_times = nullptr;
+ IntGauge* task_wait_worker_ns_avg_in_last_1000_times = nullptr;
+
+ IntervalHistogramStat<int64_t> _task_execution_time_ns_statistic {1000};
+ IntervalHistogramStat<int64_t> _task_wait_worker_time_ns_statistic {1000};
+
+ IntCounter* thread_pool_submit_failed = nullptr;
};
// Entry point for token-based task submission and blocking for a particular
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index f419f58037a..1b14d172790 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -56,24 +56,9 @@
namespace doris::vectorized {
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_queue_size,
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_thread_num,
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_queue_size,
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_thread_num,
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_queue_size,
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_thread_num,
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(group_local_scan_thread_pool_queue_size,
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(group_local_scan_thread_pool_thread_num,
MetricUnit::NOUNIT);
-
ScannerScheduler::ScannerScheduler() = default;
-ScannerScheduler::~ScannerScheduler() {
- if (!_is_init) {
- return;
- }
-
- _deregister_metrics();
-}
+ScannerScheduler::~ScannerScheduler() = default;
void ScannerScheduler::stop() {
if (!_is_init) {
@@ -116,7 +101,6 @@ Status ScannerScheduler::init(ExecEnv* env) {
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
.set_max_queue_size(config::doris_scanner_thread_pool_queue_size)
.build(&_limited_scan_thread_pool));
- _register_metrics();
_is_init = true;
return Status::OK();
}
@@ -141,11 +125,6 @@ Status
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
scanner_delegate->_scanner->start_wait_worker_timer();
auto s = ctx->thread_token->submit_func([scanner_ref = scan_task,
ctx]() {
- DorisMetrics::instance()->scanner_task_queued->increment(-1);
- DorisMetrics::instance()->scanner_task_running->increment(1);
- Defer metrics_defer(
- [&] {
DorisMetrics::instance()->scanner_task_running->increment(-1); });
-
auto status = [&] {
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
return Status::OK();
@@ -171,11 +150,6 @@ Status
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
auto sumbit_task = [&]() {
SimplifiedScanScheduler* scan_sched = ctx->get_scan_scheduler();
auto work_func = [scanner_ref = scan_task, ctx]() {
- DorisMetrics::instance()->scanner_task_queued->increment(-1);
- DorisMetrics::instance()->scanner_task_running->increment(1);
- Defer metrics_defer(
- [&] {
DorisMetrics::instance()->scanner_task_running->increment(-1); });
-
auto status = [&] {
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
return Status::OK();
@@ -348,32 +322,6 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
ctx->push_back_scan_task(scan_task);
}
-void ScannerScheduler::_register_metrics() {
- REGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size,
- [this]() { return
_local_scan_thread_pool->get_queue_size(); });
- REGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num,
- [this]() { return
_local_scan_thread_pool->get_active_threads(); });
- REGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size,
- [this]() { return
_remote_scan_thread_pool->get_queue_size(); });
- REGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num,
- [this]() { return
_remote_scan_thread_pool->get_active_threads(); });
- REGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size,
- [this]() { return
_limited_scan_thread_pool->get_queue_size(); });
- REGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num,
- [this]() { return
_limited_scan_thread_pool->num_threads(); });
-}
-
-void ScannerScheduler::_deregister_metrics() {
- DEREGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size);
- DEREGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num);
- DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size);
- DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num);
- DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size);
- DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num);
- DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_queue_size);
- DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_thread_num);
-}
-
int ScannerScheduler::get_remote_scan_thread_num() {
int remote_max_thread_num =
config::doris_max_remote_scanner_thread_pool_thread_num != -1
?
config::doris_max_remote_scanner_thread_pool_thread_num
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h
b/be/src/vec/exec/scan/scanner_scheduler.h
index 7731b3ba8f9..e94659c79d1 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -114,8 +114,12 @@ struct SimplifiedScanTask {
class SimplifiedScanScheduler {
public:
- SimplifiedScanScheduler(std::string sched_name,
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
- : _is_stop(false), _cgroup_cpu_ctl(cgroup_cpu_ctl),
_sched_name(sched_name) {}
+ SimplifiedScanScheduler(std::string sched_name,
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl,
+ std::string workload_group = "system")
+ : _is_stop(false),
+ _cgroup_cpu_ctl(cgroup_cpu_ctl),
+ _sched_name(sched_name),
+ _workload_group(workload_group) {}
~SimplifiedScanScheduler() {
stop();
@@ -129,7 +133,7 @@ public:
}
Status start(int max_thread_num, int min_thread_num, int queue_size) {
- RETURN_IF_ERROR(ThreadPoolBuilder(_sched_name)
+ RETURN_IF_ERROR(ThreadPoolBuilder(_sched_name, _workload_group)
.set_min_threads(min_thread_num)
.set_max_threads(max_thread_num)
.set_max_queue_size(queue_size)
@@ -140,13 +144,7 @@ public:
Status submit_scan_task(SimplifiedScanTask scan_task) {
if (!_is_stop) {
- DorisMetrics::instance()->scanner_task_queued->increment(1);
- auto st = _scan_thread_pool->submit_func([scan_task] {
scan_task.scan_func(); });
- if (!st.ok()) {
- DorisMetrics::instance()->scanner_task_queued->increment(-1);
-
DorisMetrics::instance()->scanner_task_submit_failed->increment(1);
- }
- return st;
+ return _scan_thread_pool->submit_func([scan_task] {
scan_task.scan_func(); });
} else {
return Status::InternalError<false>("scanner pool {} is
shutdown.", _sched_name);
}
@@ -216,6 +214,7 @@ private:
std::atomic<bool> _is_stop;
std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
std::string _sched_name;
+ std::string _workload_group;
};
} // namespace doris::vectorized
diff --git a/be/test/io/fs/buffered_reader_test.cpp
b/be/test/io/fs/buffered_reader_test.cpp
index 658c98ba514..1038f056aba 100644
--- a/be/test/io/fs/buffered_reader_test.cpp
+++ b/be/test/io/fs/buffered_reader_test.cpp
@@ -36,7 +36,10 @@ namespace doris {
using io::FileReader;
class BufferedReaderTest : public testing::Test {
public:
- BufferedReaderTest() {
+ BufferedReaderTest() = default;
+
+protected:
+ void SetUp() override {
std::unique_ptr<ThreadPool> _pool;
static_cast<void>(ThreadPoolBuilder("BufferedReaderPrefetchThreadPool")
.set_min_threads(5)
@@ -44,10 +47,9 @@ public:
.build(&_pool));
ExecEnv::GetInstance()->_buffered_reader_prefetch_thread_pool =
std::move(_pool);
}
-
-protected:
- virtual void SetUp() {}
- virtual void TearDown() {}
+ void TearDown() override {
+ ExecEnv::GetInstance()->_buffered_reader_prefetch_thread_pool.reset();
+ }
};
class SyncLocalFileReader : public io::FileReader {
diff --git a/be/test/io/fs/remote_file_system_test.cpp
b/be/test/io/fs/remote_file_system_test.cpp
index c5d80d1b65d..309a31eb97f 100644
--- a/be/test/io/fs/remote_file_system_test.cpp
+++ b/be/test/io/fs/remote_file_system_test.cpp
@@ -410,10 +410,10 @@ TEST_F(RemoteFileSystemTest, TestHdfsFileSystem) {
TEST_F(RemoteFileSystemTest, TestS3FileSystem) {
std::unique_ptr<ThreadPool> _pool;
- ThreadPoolBuilder("S3FileUploadThreadPool")
- .set_min_threads(5)
- .set_max_threads(10)
- .build(&_pool);
+ std::ignore = ThreadPoolBuilder("S3FileUploadThreadPool")
+ .set_min_threads(5)
+ .set_max_threads(10)
+ .build(&_pool);
ExecEnv::GetInstance()->_s3_file_upload_thread_pool = std::move(_pool);
S3Conf s3_conf;
S3URI s3_uri(s3_location);
@@ -563,6 +563,7 @@ TEST_F(RemoteFileSystemTest, TestS3FileSystem) {
std::string download_content;
CHECK_STATUS_OK(fs->direct_download(direct_remote_file,
&download_content));
ASSERT_EQ("abc", download_content);
+ ExecEnv::GetInstance()->_s3_file_upload_thread_pool.reset();
}
} // namespace doris
diff --git a/be/test/io/fs/s3_file_writer_test.cpp
b/be/test/io/fs/s3_file_writer_test.cpp
index 7021346a704..6469559d0d8 100644
--- a/be/test/io/fs/s3_file_writer_test.cpp
+++ b/be/test/io/fs/s3_file_writer_test.cpp
@@ -306,6 +306,7 @@ public:
sp->clear_call_back(mockcallback.point_name);
});
sp->disable_processing();
+ ExecEnv::GetInstance()->_s3_file_upload_thread_pool.reset();
}
};
diff --git a/be/test/olap/rowset/beta_rowset_test.cpp
b/be/test/olap/rowset/beta_rowset_test.cpp
index 0c3001758f0..2e13436b3d3 100644
--- a/be/test/olap/rowset/beta_rowset_test.cpp
+++ b/be/test/olap/rowset/beta_rowset_test.cpp
@@ -236,6 +236,8 @@ TEST_F(BetaRowsetTest, ReadTest) {
.region = "region",
.ak = "ak",
.sk = "sk",
+ .token = "",
+ .bucket = "",
}};
std::string resource_id = "10000";
auto res = io::S3FileSystem::create(std::move(s3_conf),
io::FileSystem::TMP_FS_ID);
diff --git a/be/test/olap/rowset/unique_rowset_id_generator_test.cpp
b/be/test/olap/rowset/unique_rowset_id_generator_test.cpp
index aeb6d7bc8ae..7ef95e938e1 100644
--- a/be/test/olap/rowset/unique_rowset_id_generator_test.cpp
+++ b/be/test/olap/rowset/unique_rowset_id_generator_test.cpp
@@ -123,6 +123,7 @@ TEST_F(UniqueRowsetIdGeneratorTest, GenerateIdBenchmark) {
hi <<= 56;
RowsetId last_id = id_generator.next_id();
EXPECT_EQ(last_id.hi, hi + kNumThreads * kIdPerThread + 1);
+ pool.reset();
}
} // namespace doris
diff --git a/be/test/testutil/run_all_tests.cpp
b/be/test/testutil/run_all_tests.cpp
index 1208141a8fa..44cccb7fec1 100644
--- a/be/test/testutil/run_all_tests.cpp
+++ b/be/test/testutil/run_all_tests.cpp
@@ -50,6 +50,16 @@
int main(int argc, char** argv) {
doris::ThreadLocalHandle::create_thread_local_if_not_exits();
doris::ExecEnv::GetInstance()->init_mem_tracker();
+ // Used for unit test
+ std::unique_ptr<doris::ThreadPool> non_block_close_thread_pool;
+
+ std::ignore = doris::ThreadPoolBuilder("NonBlockCloseThreadPool")
+ .set_min_threads(12)
+ .set_max_threads(48)
+ .build(&non_block_close_thread_pool);
+ doris::ExecEnv::GetInstance()->set_non_block_close_thread_pool(
+ std::move(non_block_close_thread_pool));
+
doris::thread_context()->thread_mem_tracker_mgr->init();
std::shared_ptr<doris::MemTrackerLimiter> test_tracker =
doris::MemTrackerLimiter::create_shared(doris::MemTrackerLimiter::Type::GLOBAL,
@@ -96,5 +106,7 @@ int main(int argc, char** argv) {
doris::ExecEnv::GetInstance()->set_tracking_memory(false);
int res = RUN_ALL_TESTS();
+
+ doris::ExecEnv::GetInstance()->set_non_block_close_thread_pool(nullptr);
return res;
}
diff --git a/be/test/util/countdown_latch_test.cpp
b/be/test/util/countdown_latch_test.cpp
index ff3c21c6e13..86999ca929d 100644
--- a/be/test/util/countdown_latch_test.cpp
+++ b/be/test/util/countdown_latch_test.cpp
@@ -59,6 +59,7 @@ TEST(TestCountDownLatch, TestLatch) {
EXPECT_TRUE(pool->submit_func(std::bind(decrement_latch, &latch,
1000)).ok());
latch.wait();
EXPECT_EQ(0, latch.count());
+ pool.reset();
}
// Test that resetting to zero while there are waiters lets the waiters
diff --git a/be/test/util/interval_histogram_test.cpp
b/be/test/util/interval_histogram_test.cpp
new file mode 100644
index 00000000000..aa63c360165
--- /dev/null
+++ b/be/test/util/interval_histogram_test.cpp
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/interval_histogram.h"
+
+#include <gtest/gtest.h>
+
+#include <thread>
+
+namespace doris {
+
+TEST(IntervalHistogramStat, SerialTest) {
+ IntervalHistogramStat<int> stat(5);
+
+ stat.add(10);
+ stat.add(20);
+ stat.add(30);
+ stat.add(40);
+ stat.add(50);
+
+ EXPECT_EQ(stat.mean(), 30);
+ EXPECT_EQ(stat.median(), 30);
+ EXPECT_EQ(stat.max(), 50);
+ EXPECT_EQ(stat.min(), 10);
+
+ // Make window move forward
+ stat.add(60);
+ stat.add(70);
+
+ // window now contains [30, 40, 50, 60, 70]
+ EXPECT_EQ(stat.mean(), 50);
+ EXPECT_EQ(stat.median(), 50);
+ EXPECT_EQ(stat.max(), 70);
+ EXPECT_EQ(stat.min(), 30);
+}
+
+TEST(IntervalHistogramStatTest, ParallelTest) {
+ constexpr int thread_count = 10;
+ constexpr int values_per_thread = 10;
+ IntervalHistogramStat<int> stat(thread_count * values_per_thread);
+
+ auto add_values = [&stat](int start_value, int count) {
+ for (int i = 0; i < count; ++i) {
+ stat.add(start_value + i);
+ }
+ };
+
+ std::vector<std::thread> threads;
+ for (int i = 0; i < thread_count; ++i) {
+ threads.emplace_back(add_values, i * values_per_thread,
values_per_thread);
+ }
+
+ for (auto& thread : threads) {
+ thread.join();
+ }
+
+ int total_values = thread_count * values_per_thread;
+ EXPECT_EQ(stat.mean(), (total_values - 1) / 2);
+ EXPECT_EQ(stat.max(), total_values - 1);
+ EXPECT_EQ(stat.min(), 0);
+ EXPECT_EQ(stat.median(), (total_values - 1) / 2);
+}
+
+} // namespace doris
diff --git a/be/test/vec/aggregate_functions/agg_linear_histogram_test.cpp
b/be/test/vec/aggregate_functions/agg_linear_histogram_test.cpp
index 3dbf34a4dcb..fe94afe5118 100644
--- a/be/test/vec/aggregate_functions/agg_linear_histogram_test.cpp
+++ b/be/test/vec/aggregate_functions/agg_linear_histogram_test.cpp
@@ -204,8 +204,8 @@ public:
<< "(" << data_types[0]->get_name() << ")";
AggregateFunctionSimpleFactory factory =
AggregateFunctionSimpleFactory::instance();
- auto agg_function =
- factory.get("linear_histogram", data_types, false, -1,
{.enable_decimal256 = true});
+ auto agg_function = factory.get("linear_histogram", data_types, false,
-1,
+ {.enable_decimal256 = true,
.column_names = {""}});
EXPECT_NE(agg_function, nullptr);
std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
index be54bec5e2e..b1bbae2e3ce 100644
--- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
+++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
@@ -15,6 +15,16 @@
// specific language governing permissions and limitations
// under the License.
+def getMetrics = { ip, port ->
+ def dst = 'http://' + ip + ':' + port
+ def conn = new URL(dst + "/metrics").openConnection()
+ conn.setRequestMethod("GET")
+ def encoding =
Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
+ (context.config.feHttpPassword == null ? "" :
context.config.feHttpPassword)).getBytes("UTF-8"))
+ conn.setRequestProperty("Authorization", "Basic ${encoding}")
+ return conn.getInputStream().getText()
+ }
+
suite("test_crud_wlg") {
def table_name = "wlg_test_table"
def table_name2 = "wlg_test_table2"
@@ -787,4 +797,50 @@ suite("test_crud_wlg") {
sql "drop workload group if exists default_val_wg"
+ for (int i = 0; i < 20; i++) {
+ // 1. SHOW BACKENDS get be ip and http port
+ Map<String, String> backendId_to_backendIP = new HashMap<>();
+ Map<String, String> backendId_to_backendHttpPort = new HashMap<>();
+ getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort);
+ // Print above maps in logger.
+ logger.info("backendId_to_backendIP: " + backendId_to_backendIP);
+ logger.info("backendId_to_backendHttpPort: " +
backendId_to_backendHttpPort);
+
+ // 2. CREATE WORKLOAD GROUP
+ sql "drop workload group if exists test_wg_metrics;"
+ sql "create workload group if not exists test_wg_metrics " +
+ "properties ( " +
+ " 'cpu_share'='10', " +
+ " 'memory_limit'='10%', " +
+ " 'enable_memory_overcommit'='true' " +
+ ");"
+ sql "set workload_group=test_wg_metrics;"
+ wg = sql("select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,tag,read_bytes_per_second,remote_read_bytes_per_second
from information_schema.workload_groups where name = 'test_wg_metrics' order
by name;");
+ logger.info("wg: " + wg);
+
+ // 3. EXECUTE A QUERY SO THAT THE WORKLOAD GROUP IS USED
+ sql "select count(*) from numbers(\"number\"=\"100\");"
+
+ // curl backend http port to get metrics
+ // get first backendId
+ backendId = backendId_to_backendIP.keySet().iterator().next();
+ backendIP = backendId_to_backendIP.get(backendId);
+ backendHttpPort = backendId_to_backendHttpPort.get(backendId);
+ logger.info("backendId: " + backendId + ", backendIP: " + backendIP +
", backendHttpPort: " + backendHttpPort);
+
+ // Create a for loop to get metrics 5 times
+ for (int j = 0; j < 5; j++) {
+ String metrics = getMetrics(backendIP, backendHttpPort);
+ String filteredMetrics = metrics.split('\n').findAll { line ->
+ line.startsWith('doris_be_thread_pool') &&
line.contains('workload_group="test_wg_metrics"') &&
line.contains('thread_pool_name="Scan_test_wg_metrics"')
+ }.join('\n')
+ // Filter metrics with name test_wg_metrics
+ logger.info("filteredMetrics: " + filteredMetrics);
+ List<String> lines = filteredMetrics.split('\n').findAll {
it.trim() }
+ assert lines.size() == 5
+ }
+
+ sql "drop workload group if exists test_wg_metrics;"
+ }
+ sql "drop workload group if exists test_wg_metrics;"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]