This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch branch-1.13.x in repository https://gitbox.apache.org/repos/asf/kudu.git
commit ea4b91e38298a9cb75f9919adc62deebfbc9b788 Author: Alexey Serbin <[email protected]> AuthorDate: Sun Sep 6 17:15:41 2020 -0700 [threadpool] optimize queue overload detection This patch simplifies the queue overload detection and makes it a bit more robust, according to 'perf stat' running KUDU_ALLOW_SLOW_TESTS=1 perf stat ./bin/threadpool-test \ --gtest_filter='*ThreadPoolPerformanceTest.ConcurrentAndSerialTasksMix/1' Before: 147699.960062 task-clock # 44.348 CPUs utilized 371,519 context-switches # 0.003 M/sec 653 cpu-migrations # 0.004 K/sec 3,664 page-faults # 0.025 K/sec 423,794,029,838 cycles # 2.869 GHz 94,656,186,092 instructions # 0.22 insns per cycle 34,018,899,188 branches # 230.324 M/sec 22,374,862 branch-misses # 0.07% of all branches 3.330492839 seconds time elapsed After: 126357.374737 task-clock # 40.768 CPUs utilized 350,907 context-switches # 0.003 M/sec 3,167 cpu-migrations # 0.025 K/sec 3,478 page-faults # 0.028 K/sec 362,348,476,629 cycles # 2.868 GHz 82,964,368,394 instructions # 0.23 insns per cycle 29,553,336,891 branches # 233.887 M/sec 16,945,558 branch-misses # 0.06% of all branches 3.099419461 seconds time elapsed Change-Id: Ic01ca7419beba92d7067c60ef520811136d67587 Reviewed-on: http://gerrit.cloudera.org:8080/16424 Tested-by: Kudu Jenkins Reviewed-by: Grant Henke <[email protected]> Reviewed-by: Andrew Wong <[email protected]> (cherry picked from commit c4997af4b90b4a102597775e575320c95eb3c1ba) Reviewed-on: http://gerrit.cloudera.org:8080/16429 Reviewed-by: Attila Bukor <[email protected]> --- src/kudu/util/threadpool.cc | 28 ++++++++++++++++------------ src/kudu/util/threadpool.h | 11 ++++++++--- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc index 6e2097d..569c968 100644 --- a/src/kudu/util/threadpool.cc +++ b/src/kudu/util/threadpool.cc @@ -771,6 +771,7 @@ ThreadPool::QueueLoadMeter::QueueLoadMeter( : tpool_(tpool), queue_time_threshold_(queue_time_threshold), queue_time_history_length_(queue_time_history_length), + over_queue_threshold_num_(0), queue_head_submit_time_(MonoTime()), overloaded_since_(MonoTime()), has_spare_thread_(true) { @@ -826,18 +827,23 @@ void ThreadPool::QueueLoadMeter::UpdateQueueInfoUnlocked( const MonoTime& queue_head_submit_time, bool has_spare_thread) { tpool_.lock_.AssertAcquired(); + if (task_queue_time.Initialized()) { - // TODO(aserbin): any better way of tracking the running minimum of N numbers? queue_times_.emplace_back(task_queue_time); - queue_times_ordered_.insert(task_queue_time); + // Given the criterion to detect whether the queue is overloaded, it's not + // exactly necessary to track the running minimum of queue times for the + // specified history window. It's enough just to keep an eye on whether the + // window contains at least one element that's not over the threshold. + if (task_queue_time > queue_time_threshold_) { + ++over_queue_threshold_num_; + } if (queue_times_.size() > queue_time_history_length_) { const auto& elem = queue_times_.front(); - auto it = queue_times_ordered_.find(elem); - DCHECK(it != queue_times_ordered_.end()); - queue_times_ordered_.erase(it); + if (elem > queue_time_threshold_) { + --over_queue_threshold_num_; + } queue_times_.pop_front(); } - min_queue_time_ = *queue_times_.begin(); } queue_head_submit_time_.store(queue_head_submit_time); has_spare_thread_.store(has_spare_thread); @@ -848,11 +854,10 @@ void ThreadPool::QueueLoadMeter::UpdateQueueInfoUnlocked( const bool queue_empty = !queue_head_submit_time.Initialized(); const auto queue_time = queue_empty ? MonoDelta::FromSeconds(0) : now - queue_head_submit_time; - const auto min_queue_time = min_queue_time_.Initialized() - ? min_queue_time_ : MonoDelta::FromSeconds(0); const bool was_overloaded = overloaded_since_.load().Initialized(); const bool overloaded = !has_spare_thread && - std::max(min_queue_time, queue_time) > queue_time_threshold_; + (over_queue_threshold_num_ == queue_time_history_length_ || + queue_time > queue_time_threshold_); // Track the state transitions and update overloaded_since_. if (!was_overloaded && overloaded) { VLOG(3) << Substitute("state transition: normal --> overloaded"); @@ -863,13 +868,12 @@ void ThreadPool::QueueLoadMeter::UpdateQueueInfoUnlocked( } VLOG(4) << Substitute("state refreshed: overloaded since $0, queue $1, " "has $2 thread, queue head submit time $3, " - "queue time $4, min queue time $5", + "queue time $4", overloaded_since_.load().ToString(), queue_empty ? "empty" : "not empty", has_spare_thread ? "spare" : "no spare", queue_head_submit_time.ToString(), - queue_time.ToString(), - min_queue_time.ToString()); + queue_time.ToString()); } std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) { diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h index 403acbd..ff1fae8 100644 --- a/src/kudu/util/threadpool.h +++ b/src/kudu/util/threadpool.h @@ -16,13 +16,14 @@ // under the License. #pragma once +#include <sys/types.h> + #include <atomic> #include <cstddef> #include <deque> #include <functional> #include <iosfwd> #include <memory> -#include <set> #include <string> #include <unordered_set> @@ -340,11 +341,15 @@ class ThreadPool { // to the parameter 'N' in the description of the algorithm above. const size_t queue_time_history_length_; + // Number of elements in the queue history measurement window which are + // over the threshold specified by 'queue_time_threshold_'. Using the + // terminology from above, (min(QT_historic(M) > T_overload) iff + // (over_queue_threshold_num_ == M). + ssize_t over_queue_threshold_num_; + // Queue timings of the most recent samples. The size of these containers // is kept under queue_time_history_length_ limit. std::deque<MonoDelta> queue_times_; - std::multiset<MonoDelta> queue_times_ordered_; - MonoDelta min_queue_time_; // Below fields are to store the latest snapshot of the information about // the task queue of the pool the meter is attached to.
