This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit fc8615c37eb4e28f3cc6bea0fcd5a8732451e883 Author: Alexey Serbin <[email protected]> AuthorDate: Mon Aug 10 18:16:53 2020 -0700 KUDU-1587 part 1: load meter for ThreadPool This patch introduces a load meter for ThreadPool, aiming to use active queue management techniques (AQM) such as CoDel [1] in scenarios where thread pool queue load metrics are applicable (e.g., KUDU-1587). [1] https://en.wikipedia.org/wiki/CoDel Change-Id: I640716dc32f193e68361ca623ee7b9271e661d8b Reviewed-on: http://gerrit.cloudera.org:8080/16332 Tested-by: Kudu Jenkins Reviewed-by: Andrew Wong <[email protected]> --- src/kudu/util/threadpool-test.cc | 274 +++++++++++++++++++++++++++++++++++++-- src/kudu/util/threadpool.cc | 203 ++++++++++++++++++++++++++--- src/kudu/util/threadpool.h | 154 ++++++++++++++++++++++ 3 files changed, 603 insertions(+), 28 deletions(-) diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc index 93806f8..cecd01b 100644 --- a/src/kudu/util/threadpool-test.cc +++ b/src/kudu/util/threadpool-test.cc @@ -162,10 +162,12 @@ TEST_F(ThreadPoolTest, TestSubmitAfterShutdown) { } TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) { - ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) - .set_min_threads(0) - .set_max_threads(3) - .set_idle_timeout(MonoDelta::FromMilliseconds(1)))); + constexpr int kIdleTimeoutMs = 1; + ASSERT_OK(RebuildPoolWithBuilder( + ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(0) + .set_max_threads(3) + .set_idle_timeout(MonoDelta::FromMilliseconds(kIdleTimeoutMs)))); // There are no threads to start with. ASSERT_TRUE(pool_->num_threads() == 0); @@ -186,6 +188,11 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) { latch.CountDown(); pool_->Wait(); ASSERT_EQ(0, pool_->active_threads_); + // Wait for more than idle timeout: so threads should be gone since + // min_threads is set to 0. + SleepFor(MonoDelta::FromMilliseconds(10 * kIdleTimeoutMs)); + ASSERT_EQ(0, pool_->num_threads()); + ASSERT_EQ(0, pool_->active_threads_); pool_->Shutdown(); ASSERT_EQ(0, pool_->num_threads()); } @@ -252,10 +259,12 @@ TEST_F(ThreadPoolTest, TestRace) { } TEST_F(ThreadPoolTest, TestVariableSizeThreadPool) { - ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) - .set_min_threads(1) - .set_max_threads(4) - .set_idle_timeout(MonoDelta::FromMilliseconds(1)))); + constexpr int kIdleTimeoutMs = 1; + ASSERT_OK(RebuildPoolWithBuilder( + ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(1) + .set_max_threads(4) + .set_idle_timeout(MonoDelta::FromMilliseconds(kIdleTimeoutMs)))); // There is 1 thread to start with. ASSERT_EQ(1, pool_->num_threads()); @@ -276,6 +285,9 @@ TEST_F(ThreadPoolTest, TestVariableSizeThreadPool) { latch.CountDown(); pool_->Wait(); ASSERT_EQ(0, pool_->active_threads_); + SleepFor(MonoDelta::FromMilliseconds(10 * kIdleTimeoutMs)); + ASSERT_EQ(0, pool_->active_threads_); + ASSERT_EQ(1, pool_->num_threads()); pool_->Shutdown(); ASSERT_EQ(0, pool_->num_threads()); } @@ -448,6 +460,252 @@ TEST_F(ThreadPoolTest, TestMetrics) { ASSERT_EQ(6, all_metrics[0].run_time_us_histogram->TotalCount()); } +// Test scenario to verify the functionality of the QueueLoadMeter. +TEST_F(ThreadPoolTest, QueueLoadMeter) { + const auto kQueueTimeThresholdMs = 100; + const auto kIdleThreadTimeoutMs = 200; + constexpr auto kMaxThreads = 3; + ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(0) + .set_max_threads(kMaxThreads) + .set_queue_overload_threshold(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs)) + .set_idle_timeout(MonoDelta::FromMilliseconds(kIdleThreadTimeoutMs)))); + // An idle pool must not have its queue overloaded. + ASSERT_FALSE(pool_->QueueOverloaded()); + + // One instant tasks cannot make pool's queue overloaded. + ASSERT_OK(pool_->Submit([](){})); + ASSERT_FALSE(pool_->QueueOverloaded()); + pool_->Wait(); + ASSERT_FALSE(pool_->QueueOverloaded()); + + for (auto i = 0; i < kMaxThreads; ++i) { + ASSERT_OK(pool_->Submit([](){ + SleepFor(MonoDelta::FromMilliseconds(2 * kQueueTimeThresholdMs)); + })); + } + ASSERT_FALSE(pool_->QueueOverloaded()); + pool_->Wait(); + ASSERT_FALSE(pool_->QueueOverloaded()); + + for (auto i = 0; i < 2 * kMaxThreads; ++i) { + ASSERT_OK(pool_->Submit([](){ + SleepFor(MonoDelta::FromMilliseconds(2 * kQueueTimeThresholdMs)); + })); + } + ASSERT_FALSE(pool_->QueueOverloaded()); + SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs + 10)); + ASSERT_TRUE(pool_->QueueOverloaded()); + // Should still be overloaded after first kMaxThreads tasks are processed. + SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs + 10)); + ASSERT_TRUE(pool_->QueueOverloaded()); + pool_->Wait(); + ASSERT_FALSE(pool_->QueueOverloaded()); + + // Many instant tasks cannot make pool overloaded. + for (auto i = 0; i < kMaxThreads; ++i) { + ASSERT_OK(pool_->Submit([](){})); + } + ASSERT_FALSE(pool_->QueueOverloaded()); + pool_->Wait(); + // For for the threads to be shutdown due to inactivity. + SleepFor(MonoDelta::FromMilliseconds(2 * kIdleThreadTimeoutMs)); + // Even if all threads are shutdown, an idle pool with empty queue should not + // be overloaded. + ASSERT_FALSE(pool_->QueueOverloaded()); + + // Shovel some light tasks once again: this should not overload the queue. + for (auto i = 0; i < 10 * kMaxThreads; ++i) { + ASSERT_OK(pool_->Submit([](){ + SleepFor(MonoDelta::FromMilliseconds(1)); + })); + } + ASSERT_FALSE(pool_->QueueOverloaded()); + pool_->Wait(); + ASSERT_FALSE(pool_->QueueOverloaded()); + + // Submit a bunch of instant tasks via a single token: the queue should not + // become overloaded. + { + unique_ptr<ThreadPoolToken> t = pool_->NewToken( + ThreadPool::ExecutionMode::SERIAL); + ASSERT_OK(t->Submit([](){})); + ASSERT_FALSE(pool_->QueueOverloaded()); + pool_->Wait(); + ASSERT_FALSE(pool_->QueueOverloaded()); + + for (auto i = 0; i < 100; ++i) { + ASSERT_OK(t->Submit([](){})); + } + ASSERT_FALSE(pool_->QueueOverloaded()); + SleepFor(MonoDelta::FromMilliseconds(1)); + ASSERT_FALSE(pool_->QueueOverloaded()); + pool_->Wait(); + ASSERT_FALSE(pool_->QueueOverloaded()); + } + + // Submit many instant tasks via multiple tokens (more than the maximum + // number of worker threads in a pool) and many lightweight tasks which can + // run concurrently: the queue should not become overloaded. + { + constexpr auto kNumTokens = 2 * kMaxThreads; + vector<unique_ptr<ThreadPoolToken>> tokens; + tokens.reserve(kNumTokens); + for (auto i = 0; i < kNumTokens; ++i) { + tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL)); + } + + for (auto& t : tokens) { + for (auto i = 0; i < 50; ++i) { + ASSERT_OK(t->Submit([](){})); + } + for (auto i = 0; i < 10; ++i) { + ASSERT_OK(pool_->Submit([](){})); + } + } + ASSERT_FALSE(pool_->QueueOverloaded()); + SleepFor(MonoDelta::FromMilliseconds(1)); + ASSERT_FALSE(pool_->QueueOverloaded()); + pool_->Wait(); + ASSERT_FALSE(pool_->QueueOverloaded()); + } + + // Submit many long running tasks via serial tokens where the number of tokens + // is less than the maximum number of worker threads in the pool. The queue + // of the pool should not become overloaded since the pool has one spare + // thread to spawn. + { + constexpr auto kNumTokens = kMaxThreads - 1; + ASSERT_GT(kNumTokens, 0); + vector<unique_ptr<ThreadPoolToken>> tokens; + tokens.reserve(kNumTokens); + for (auto i = 0; i < kNumTokens; ++i) { + tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL)); + } + + ASSERT_FALSE(pool_->QueueOverloaded()); + for (auto& t : tokens) { + for (auto i = 0; i < kMaxThreads; ++i) { + ASSERT_OK(t->Submit([](){ + SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs)); + })); + } + } + ASSERT_FALSE(pool_->QueueOverloaded()); + SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs)); + ASSERT_FALSE(pool_->QueueOverloaded()); + pool_->Wait(); + ASSERT_FALSE(pool_->QueueOverloaded()); + } + + // Submit many long running tasks via serial tokens where the number of tokens + // is greater or equal to the maximum number of worker threads in the pool. + // The queue of the pool should become overloaded since the pool is running + // at its capacity and queue times are over the threshold. + { + constexpr auto kNumTokens = kMaxThreads; + vector<unique_ptr<ThreadPoolToken>> tokens; + tokens.reserve(kNumTokens); + for (auto i = 0; i < kNumTokens; ++i) { + tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL)); + } + + ASSERT_FALSE(pool_->QueueOverloaded()); + for (auto& t : tokens) { + for (auto i = 0; i < kMaxThreads; ++i) { + ASSERT_OK(t->Submit([](){ + SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs)); + })); + } + } + // Since there is exactly kMaxThreads serial pool tokens with tasks, + // the queue is empty most of the time. This is because active serial tokens + // are not kept in the queue. So, the status of the queue cannot be reliably + // determined by peeking at the submission times of the elements in the + // queue. Then the only way to detect overload of the queue is the history + // of queue times. The latter will reflect long queue times only after + // processing two tasks in each of the serial tokens. So, it's expected + // to get a stable report on the queue status only after two + // kQueueTimeThresholdMs time intervals. + SleepFor(MonoDelta::FromMilliseconds(2 * kQueueTimeThresholdMs)); + ASSERT_TRUE(pool_->QueueOverloaded()); + pool_->Wait(); + ASSERT_FALSE(pool_->QueueOverloaded()); + } + + // A mixed case: submit many long running tasks via serial tokens where the + // number of tokens is less than the maximum number of worker threads in the + // pool and submit many instant tasks that can run concurrently. + { + constexpr auto kNumTokens = kMaxThreads - 1; + ASSERT_GT(kNumTokens, 0); + vector<unique_ptr<ThreadPoolToken>> tokens; + tokens.reserve(kNumTokens); + for (auto i = 0; i < kNumTokens; ++i) { + tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL)); + } + + ASSERT_FALSE(pool_->QueueOverloaded()); + for (auto& t : tokens) { + for (auto i = 0; i < kMaxThreads; ++i) { + ASSERT_OK(t->Submit([](){ + SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs)); + })); + } + } + ASSERT_FALSE(pool_->QueueOverloaded()); + + // Add several light tasks in addition to the scheduled serial ones. This + // should not overload the queue. + for (auto i = 0; i < 10; ++i) { + ASSERT_OK(pool_->Submit([](){ + SleepFor(MonoDelta::FromMilliseconds(1)); + })); + } + ASSERT_FALSE(pool_->QueueOverloaded()); + SleepFor(MonoDelta::FromMilliseconds(1)); + ASSERT_FALSE(pool_->QueueOverloaded()); + SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs)); + ASSERT_FALSE(pool_->QueueOverloaded()); + pool_->Wait(); + ASSERT_FALSE(pool_->QueueOverloaded()); + } + + // Another mixed case: submit many long running tasks via a serial token + // and many long running tasks that can run concurrently. The queue should + // become overloaded. + { + constexpr auto kNumTokens = 1; + vector<unique_ptr<ThreadPoolToken>> tokens; + tokens.reserve(kNumTokens); + for (auto i = 0; i < kNumTokens; ++i) { + tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL)); + } + + ASSERT_FALSE(pool_->QueueOverloaded()); + for (auto& t : tokens) { + for (auto i = 0; i < kMaxThreads; ++i) { + ASSERT_OK(t->Submit([](){ + SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs)); + })); + } + } + ASSERT_FALSE(pool_->QueueOverloaded()); + + // Add several light tasks in addition to the scheduled serial ones. This + // should not overload the queue. + for (auto i = 0; i < 2 * kMaxThreads; ++i) { + ASSERT_OK(pool_->Submit([](){ + SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs)); + })); + } + SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs)); + ASSERT_TRUE(pool_->QueueOverloaded()); + pool_->Wait(); + ASSERT_FALSE(pool_->QueueOverloaded()); + } +} + // Test that a thread pool will crash if asked to run its own blocking // functions in a pool thread. // diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc index 9d8cc4a..6e2097d 100644 --- a/src/kudu/util/threadpool.cc +++ b/src/kudu/util/threadpool.cc @@ -17,6 +17,7 @@ #include "kudu/util/threadpool.h" +#include <algorithm> #include <cstdint> #include <deque> #include <functional> @@ -24,7 +25,6 @@ #include <memory> #include <ostream> #include <string> -#include <utility> #include <glog/logging.h> @@ -90,6 +90,12 @@ ThreadPoolBuilder& ThreadPoolBuilder::set_metrics(ThreadPoolMetrics metrics) { return *this; } +ThreadPoolBuilder& ThreadPoolBuilder::set_queue_overload_threshold( + const MonoDelta& threshold) { + queue_overload_threshold_ = threshold; + return *this; +} + Status ThreadPoolBuilder::Build(unique_ptr<ThreadPool>* pool) const { pool->reset(new ThreadPool(*this)); return (*pool)->Init(); @@ -266,20 +272,20 @@ const char* ThreadPoolToken::StateToString(State s) { //////////////////////////////////////////////////////// ThreadPool::ThreadPool(const ThreadPoolBuilder& builder) - : name_(builder.name_), - min_threads_(builder.min_threads_), - max_threads_(builder.max_threads_), - max_queue_size_(builder.max_queue_size_), - idle_timeout_(builder.idle_timeout_), - pool_status_(Status::Uninitialized("The pool was not initialized.")), - idle_cond_(&lock_), - no_threads_cond_(&lock_), - num_threads_(0), - num_threads_pending_start_(0), - active_threads_(0), - total_queued_tasks_(0), - tokenless_(NewToken(ExecutionMode::CONCURRENT)), - metrics_(builder.metrics_) { + : name_(builder.name_), + min_threads_(builder.min_threads_), + max_threads_(builder.max_threads_), + max_queue_size_(builder.max_queue_size_), + idle_timeout_(builder.idle_timeout_), + pool_status_(Status::Uninitialized("The pool was not initialized.")), + idle_cond_(&lock_), + no_threads_cond_(&lock_), + num_threads_(0), + num_threads_pending_start_(0), + active_threads_(0), + total_queued_tasks_(0), + tokenless_(NewToken(ExecutionMode::CONCURRENT)), + metrics_(builder.metrics_) { string prefix = !builder.trace_metric_prefix_.empty() ? builder.trace_metric_prefix_ : builder.name_; @@ -289,6 +295,11 @@ ThreadPool::ThreadPool(const ThreadPoolBuilder& builder) prefix + ".run_wall_time_us"); run_cpu_time_trace_metric_name_ = TraceMetrics::InternName( prefix + ".run_cpu_time_us"); + + const auto& ovt = builder.queue_overload_threshold_; + if (ovt.Initialized() && ovt.ToNanoseconds() > 0) { + load_meter_.reset(new QueueLoadMeter(*this, ovt, max_threads_)); + } } ThreadPool::~ThreadPool() { @@ -397,6 +408,17 @@ unique_ptr<ThreadPoolToken> ThreadPool::NewTokenWithMetrics( return t; } +bool ThreadPool::QueueOverloaded(MonoDelta* overloaded_time, + MonoDelta* threshold) const { + if (!load_meter_) { + return false; + } + if (threshold) { + *threshold = load_meter_->queue_time_threshold(); + } + return load_meter_->overloaded(overloaded_time); +} + void ThreadPool::ReleaseToken(ThreadPoolToken* t) { MutexLock guard(lock_); CHECK(!t->IsActive()) << Substitute("Token with state $0 may not be released", @@ -410,7 +432,7 @@ Status ThreadPool::Submit(std::function<void()> f) { Status ThreadPool::DoSubmit(std::function<void()> f, ThreadPoolToken* token) { DCHECK(token); - MonoTime submit_time = MonoTime::Now(); + const MonoTime submit_time = MonoTime::Now(); MutexLock guard(lock_); if (PREDICT_FALSE(!pool_status_.ok())) { @@ -493,6 +515,7 @@ Status ThreadPool::DoSubmit(std::function<void()> f, ThreadPoolToken* token) { idle_threads_.front().not_empty.Signal(); idle_threads_.pop_front(); } + NotifyLoadMeterUnlocked(); guard.Unlock(); if (metrics_.queue_length_histogram) { @@ -518,7 +541,6 @@ Status ThreadPool::DoSubmit(std::function<void()> f, ThreadPoolToken* token) { } } - return Status::OK(); } @@ -570,6 +592,7 @@ void ThreadPool::DispatchThread() { // // Note: if FIFO behavior is desired, it's as simple as changing this to push_back(). idle_threads_.push_front(me); + NotifyLoadMeterUnlocked(); SCOPED_CLEANUP({ // For some wake ups (i.e. Shutdown or DoSubmit) this thread is // guaranteed to be unlinked after being awakened. In others (i.e. @@ -609,6 +632,10 @@ void ThreadPool::DispatchThread() { --total_queued_tasks_; ++active_threads_; + const MonoTime now(MonoTime::Now()); + const MonoDelta queue_time = now - task.submit_time; + NotifyLoadMeterUnlocked(queue_time); + unique_lock.Unlock(); // Release the reference which was held by the queued item. @@ -617,9 +644,8 @@ void ThreadPool::DispatchThread() { task.trace->Release(); } - // Update metrics - MonoTime now(MonoTime::Now()); - int64_t queue_time_us = (now - task.submit_time).ToMicroseconds(); + // Update metrics. + const int64_t queue_time_us = queue_time.ToMicroseconds(); TRACE_COUNTER_INCREMENT(queue_time_trace_metric_name_, queue_time_us); if (metrics_.queue_time_us_histogram) { metrics_.queue_time_us_histogram->Increment(queue_time_us); @@ -673,6 +699,15 @@ void ThreadPool::DispatchThread() { queue_.emplace_back(token); } } + if (!queue_.empty()) { + // If the queue is empty, the LoadMeter is notified on next iteration of + // the outer while() loop under the 'if (queue_.empty())' clause. Here + // it's crucial to call NotifyLoadMeter() _before_ decrementing + // 'active_threads_' to avoid reporting this thread as a spare one despite + // the fact that it will run a task from the non-empty queue immediately + // on next iteration of the outer while() loop. + NotifyLoadMeterUnlocked(); + } if (--active_threads_ == 0) { idle_cond_.Broadcast(); } @@ -709,6 +744,134 @@ void ThreadPool::CheckNotPoolThreadUnlocked() { } } +void ThreadPool::NotifyLoadMeterUnlocked(const MonoDelta& queue_time) { + if (!load_meter_) { + return; + } + + lock_.AssertAcquired(); + MonoTime queue_head_submit_time; + if (!queue_.empty()) { + DCHECK(!queue_.front()->entries_.empty()); + queue_head_submit_time = queue_.front()->entries_.front().submit_time; + } + load_meter_->UpdateQueueInfoUnlocked(queue_time, + queue_head_submit_time, + active_threads_ < max_threads_); +} + +//////////////////////////////////////////////////////// +// ThreadPool::QueueLoadMeter +//////////////////////////////////////////////////////// + +ThreadPool::QueueLoadMeter::QueueLoadMeter( + const ThreadPool& tpool, + const MonoDelta& queue_time_threshold, + size_t queue_time_history_length) + : tpool_(tpool), + queue_time_threshold_(queue_time_threshold), + queue_time_history_length_(queue_time_history_length), + queue_head_submit_time_(MonoTime()), + overloaded_since_(MonoTime()), + has_spare_thread_(true) { +} + +bool ThreadPool::QueueLoadMeter::overloaded(MonoDelta* time_overloaded) { + // First, check whether the queue was overloaded upon the latest update + // on the queue from the ThreadPool's activity. If that's the case, there is + // no need to dig further. + MonoTime overloaded_since = overloaded_since_.load(); + if (overloaded_since.Initialized()) { + if (time_overloaded) { + *time_overloaded = MonoTime::Now() - overloaded_since; + } + return true; + } + + // Even if the queue was not overloaded upon the latest update on the + // ThreadPool's activity, the queue might have stalled because all its worker + // threads have been busy for long time. If so, 'overloaded_since_' hasn't + // been updated by the activity on the thread pool itself. However, it's + // possible to detect whether the queue has stalled by checking if the task + // at the head of the queue hasn't been dispatched for longer than + // queue_time_threshold_ time interval. + MonoTime queue_head_submit_time = queue_head_submit_time_.load(); + if (!queue_head_submit_time.Initialized() || has_spare_thread_.load()) { + return false; + } + + const auto now = MonoTime::Now(); + const MonoDelta queue_time = now - queue_head_submit_time; + if (queue_time > queue_time_threshold_) { + MonoTime overloaded_since; + if (overloaded_since_.compare_exchange_strong(overloaded_since, now)) { + VLOG(3) << Substitute("state transition: normal --> overloaded"); + if (time_overloaded) { + *time_overloaded = queue_time - queue_time_threshold_; + } + return true; + } + DCHECK(overloaded_since.Initialized()); + if (time_overloaded) { + *time_overloaded = now - overloaded_since; + } + return true; + } + + return false; +} + +void ThreadPool::QueueLoadMeter::UpdateQueueInfoUnlocked( + const MonoDelta& task_queue_time, + const MonoTime& queue_head_submit_time, + bool has_spare_thread) { + tpool_.lock_.AssertAcquired(); + if (task_queue_time.Initialized()) { + // TODO(aserbin): any better way of tracking the running minimum of N numbers? + queue_times_.emplace_back(task_queue_time); + queue_times_ordered_.insert(task_queue_time); + if (queue_times_.size() > queue_time_history_length_) { + const auto& elem = queue_times_.front(); + auto it = queue_times_ordered_.find(elem); + DCHECK(it != queue_times_ordered_.end()); + queue_times_ordered_.erase(it); + queue_times_.pop_front(); + } + min_queue_time_ = *queue_times_.begin(); + } + queue_head_submit_time_.store(queue_head_submit_time); + has_spare_thread_.store(has_spare_thread); + + // Update the load state of the queue, storing appropriate information + // in the 'overloaded_since_' field. + const auto now = MonoTime::Now(); + const bool queue_empty = !queue_head_submit_time.Initialized(); + const auto queue_time = queue_empty + ? MonoDelta::FromSeconds(0) : now - queue_head_submit_time; + const auto min_queue_time = min_queue_time_.Initialized() + ? min_queue_time_ : MonoDelta::FromSeconds(0); + const bool was_overloaded = overloaded_since_.load().Initialized(); + const bool overloaded = !has_spare_thread && + std::max(min_queue_time, queue_time) > queue_time_threshold_; + // Track the state transitions and update overloaded_since_. + if (!was_overloaded && overloaded) { + VLOG(3) << Substitute("state transition: normal --> overloaded"); + overloaded_since_.store(now); + } else if (was_overloaded && !overloaded) { + VLOG(3) << Substitute("state transition: overloaded --> normal"); + overloaded_since_.store(MonoTime()); + } + VLOG(4) << Substitute("state refreshed: overloaded since $0, queue $1, " + "has $2 thread, queue head submit time $3, " + "queue time $4, min queue time $5", + overloaded_since_.load().ToString(), + queue_empty ? "empty" : "not empty", + has_spare_thread ? "spare" : "no spare", + queue_head_submit_time.ToString(), + queue_time.ToString(), + min_queue_time.ToString()); +} + std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) { return o << ThreadPoolToken::StateToString(s); } diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h index a1037e8..403acbd 100644 --- a/src/kudu/util/threadpool.h +++ b/src/kudu/util/threadpool.h @@ -16,10 +16,13 @@ // under the License. #pragma once +#include <atomic> +#include <cstddef> #include <deque> #include <functional> #include <iosfwd> #include <memory> +#include <set> #include <string> #include <unordered_set> @@ -43,6 +46,7 @@ class ThreadPool; class ThreadPoolToken; class Trace; + // Interesting thread pool metrics. Can be applied to the entire pool (see // ThreadPoolBuilder) or to individual tokens. struct ThreadPoolMetrics { @@ -112,6 +116,7 @@ class ThreadPoolBuilder { ThreadPoolBuilder& set_max_threads(int max_threads); ThreadPoolBuilder& set_max_queue_size(int max_queue_size); ThreadPoolBuilder& set_idle_timeout(const MonoDelta& idle_timeout); + ThreadPoolBuilder& set_queue_overload_threshold(const MonoDelta& threshold); ThreadPoolBuilder& set_metrics(ThreadPoolMetrics metrics); // Instantiate a new ThreadPool with the existing builder arguments. @@ -125,6 +130,7 @@ class ThreadPoolBuilder { int max_threads_; int max_queue_size_; MonoDelta idle_timeout_; + MonoDelta queue_overload_threshold_; ThreadPoolMetrics metrics_; DISALLOW_COPY_AND_ASSIGN(ThreadPoolBuilder); @@ -215,6 +221,13 @@ class ThreadPool { return num_threads_ + num_threads_pending_start_; } + // Whether the ThreadPool's queue is overloaded. If queue overload threshold + // isn't set, returns 'false'. Otherwise, returns whether the queue is + // overloaded. If overloaded, 'overloaded_time' and 'threshold' are both + // populated with corresponding information (if not null). + bool QueueOverloaded(MonoDelta* overloaded_time = nullptr, + MonoDelta* threshold = nullptr) const; + private: FRIEND_TEST(ThreadPoolTest, TestThreadPoolWithNoMinimum); FRIEND_TEST(ThreadPoolTest, TestVariableSizeThreadPool); @@ -231,6 +244,127 @@ class ThreadPool { MonoTime submit_time; }; + // This utility class is used to track how busy the ThreadPool is by + // monitoring its task queue timings. This class uses logic modeled after + // CoDel algorithm (see [1], [2], [3]), detecting whether the queue + // of a thread pool with the maximum of M worker threads is overloaded + // (it's assumed M > 0). The idea is to keep an eye on the queue times when + // the thread pool works at its full capacity. Even if there aren't any idle + // threads, we don't want to declare the queue overloaded when it's still able + // to dispatch many lightweight tasks pretty fast once they arrived. Instead, + // we start declaring the queue overloaded only when we see the evidence of + // the newly arrived tasks being stuck in the queue for a time interval longer + // than the configured threshold. + // + // Let's denote the minimum of the queue times of the last N tasks dispatched + // by min(QT_historic(N)). If the history of already dispatched tasks is + // empty, min(QT_historic(N)) is defined to be 0. The time interval that the + // very first element of the queue has been waiting to be dispatched is + // denoted by QT_head. The queue time threshold is denoted by T_overload. + // + // With that, the criterion to detect the 'overloaded' state of a ThreadPool's + // queue is defined as the following: + // + // all available worker threads are busy + // AND + // max(QT_head, min(QT_historic(M))) > T_overload + // + // The maximum number of worker threads (M) in a thread pool naturally + // provides the proper length of the queue time history. To illustrate, let's + // examine one edge case. It's a case of continuous periodic workload of + // batches of M tasks, where all M tasks in a batch are scheduled at the same + // time and batches of M tasks are separated by T_overload time interval. + // Every batch contain (M - 1) heavyweight tasks and a single lightweight one. + // Let's assume it takes T_overload to complete a heavyweight task, and a + // lightweight one completes instantly. With such a workload running against a + // thread pool, it's able to handle many extra lightweight tasks with + // resulting queue times well under T_overload threshold. Apparently, the + // queue should not be declared overloaded in such case. So, if the queue time + // history length were less than M (e.g. (M - 1)), then the pool would be + // considered overloaded if capturing a moment when all worker threads are + // busy handling a newly arrived batch of M tasks, assuming the thread pool + // has already handled at least two batches of tasks since its start (the + // history of queue times would be { 0, T_overload, ..., T_overload } repeated + // many times). + // + // From the other side, if the size of the queue time history were greater + // than M, it would include not-so-relevant information for some patterns + // of scheduled tasks. + // + // References: + // [1] https://queue.acm.org/detail.cfm?id=2209336 + // [2] https://en.wikipedia.org/wiki/CoDel + // [3] https://man7.org/linux/man-pages/man8/CoDel.8.html + class QueueLoadMeter { + public: + // Create an instance of QueueLoadMeter class. The pool to attach to + // is specified by the 'tpool' parameter. The 'queue_time_threshold' + // parameter corresponds to 'T_overload' parameter from the algorithm + // description above, and 'queue_time_history_length' corresponds to 'N', + // respectively. + explicit QueueLoadMeter(const ThreadPool& tpool, + const MonoDelta& queue_time_threshold, + size_t queue_time_history_length); + + const MonoDelta& queue_time_threshold() const { + return queue_time_threshold_; + } + + // Check whether the queue is overloaded. If the queue is not overloaded, + // this method returns 'false'. If the queue is overloaded, this method + // return 'true'. In the latter case, if 'time_overloaded' is not null, + // it is populated with the information on how long the queue has been + // in the overloaded state. This method is lock-free. + bool overloaded(MonoDelta* time_overloaded = nullptr); + + // Notify the meter about updates on the task queue. If a task being + // dequeued, the queue time of the task dequeued is specified via the + // 'task_queue_time' parameter, otherwise it's not initialized. + // Non-initialized 'queue_head_submit_time' means there isn't next task + // in the queue. The 'has_spare_thread' parameter conveys information + // on whether a "spare" worker thread is available. + void UpdateQueueInfoUnlocked(const MonoDelta& task_queue_time, + const MonoTime& queue_head_submit_time, + bool has_spare_thread); + private: + // The pool this meter is attached to. + const ThreadPool& tpool_; + + // The threshold for the minimum queue times when determining whether the + // thread pool's tasks queue is in overloaded state. This corresponds to the + // parameter 'T_overload' in the description of the algorithm above. + const MonoDelta queue_time_threshold_; + + // The measurement window to track task queue times. That's the number + // of the most recent tasks to check for the queue times. This corresponds + // to the parameter 'N' in the description of the algorithm above. + const size_t queue_time_history_length_; + + // Queue timings of the most recent samples. The size of these containers + // is kept under queue_time_history_length_ limit. + std::deque<MonoDelta> queue_times_; + std::multiset<MonoDelta> queue_times_ordered_; + MonoDelta min_queue_time_; + + // Below fields are to store the latest snapshot of the information about + // the task queue of the pool the meter is attached to. + + // Time when the next queue task was submitted. Set to empty MonoTime() if + // there isn't a single element in the queue (i.e. Initialized() returns + // false). + std::atomic<MonoTime> queue_head_submit_time_; + + // Time when the queue has entered the overloaded state. If the queue isn't + // in overloaded state, this member field isn't initialized + // (i.e. overloaded_since.Initialized() returns false). + std::atomic<MonoTime> overloaded_since_; + + // Whether the TreadPool has a least one "spare" thread: a thread that can + // be spawned before reaching the maximum allowed number of threads, + // or one of those already spawned but currently idle. + std::atomic<bool> has_spare_thread_; + }; + // Creates a new thread pool using a builder. explicit ThreadPool(const ThreadPoolBuilder& builder); @@ -255,6 +389,22 @@ class ThreadPool { // Releases token 't' and invalidates it. void ReleaseToken(ThreadPoolToken* t); + // Notify the load meter (if enabled) on relevant updates. If no information + // on dequeued task is available, 'queue_time' should be omitted (or be an + // uninitialized MonoDelta instance). + // + // The LoadMeter should be notified about events which affect the criterion + // to evaluate the state of the queue (overloaded vs normal). The criterion + // uses the following information: + // * queue time of a newly dequeued task + // * availability of spare worker threads + // * submit time of the task at the head of the queue + // This means that LoadMeter should be notified about the following events: + // * a task at the head of the queue has been dispatched to be run + // * a worker thread completes running a task + // * a new task has been scheduled (i.e. added into the queue) + void NotifyLoadMeterUnlocked(const MonoDelta& queue_time = MonoDelta()); + const std::string name_; const int min_threads_; const int max_threads_; @@ -341,6 +491,10 @@ class ThreadPool { // ExecutionMode::CONCURRENT token used by the pool for tokenless submission. std::unique_ptr<ThreadPoolToken> tokenless_; + // The meter to track whether the pool's queue is stalled/overloaded. + // It's nullptr/none if the queue overload threshold is unset. + std::unique_ptr<QueueLoadMeter> load_meter_; + // Metrics for the entire thread pool. const ThreadPoolMetrics metrics_;
