threadpool: Allow zero-size task queue Previously, our threadpool implementation's concept of max_queue_size was not very useful, and one could not specify a zero-size queue, because we took the meaning of queue size quite literally, thus leaking an implementation detail: we use the task queue to hand off tasks from the user's thread to our worker threads.
Now, max_queue_size is more intuitive: the user is allowed to submit (max_queue_size + max_threads) tasks before new submissions are rejected (assuming no task completes in the mean time). In this paradigm, a zero-size queue is a useful thing. It implies that the total number of tasks running or queued at a given time will never exceed max_threads, which under typical circumstances means that no successfully-submitted task is left waiting for an executor for very long. Added a new functional test for max_queue_size = 0 and updated an existing queue-related test that is now deterministic instead of being racy. Change-Id: I5abf40473ee813c625e0a02232d714aab2e65109 Reviewed-on: http://gerrit.cloudera.org:8080/5275 Reviewed-by: Todd Lipcon <[email protected]> Tested-by: Kudu Jenkins Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/c52cc163 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/c52cc163 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/c52cc163 Branch: refs/heads/master Commit: c52cc1630d76c123619e40578b82cb2c7e8560c9 Parents: 730bae3 Author: Mike Percy <[email protected]> Authored: Wed Nov 30 14:47:26 2016 +0000 Committer: Todd Lipcon <[email protected]> Committed: Thu Dec 1 01:48:24 2016 +0000 ---------------------------------------------------------------------- src/kudu/util/threadpool-test.cc | 32 +++++++++++++++++++++++++------- src/kudu/util/threadpool.cc | 20 ++++++++++---------- 2 files changed, 35 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/c52cc163/src/kudu/util/threadpool-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc index 2f8e2fa..444fbf4 100644 --- a/src/kudu/util/threadpool-test.cc +++ b/src/kudu/util/threadpool-test.cc @@ -223,20 +223,39 @@ TEST(TestThreadPool, TestMaxQueueSize) { .set_max_queue_size(1).Build(&thread_pool)); CountDownLatch latch(1); + // We will be able to submit two tasks: one for max_threads == 1 and one for + // max_queue_size == 1. + ASSERT_OK(thread_pool->Submit(shared_ptr<Runnable>(new SlowTask(&latch)))); ASSERT_OK(thread_pool->Submit(shared_ptr<Runnable>(new SlowTask(&latch)))); Status s = thread_pool->Submit(shared_ptr<Runnable>(new SlowTask(&latch))); - // We race against the worker thread to re-enqueue. - // If we get there first, we fail on the 2nd Submit(). - // If the worker dequeues first, we fail on the 3rd. - if (s.ok()) { - s = thread_pool->Submit(shared_ptr<Runnable>(new SlowTask(&latch))); - } CHECK(s.IsServiceUnavailable()) << "Expected failure due to queue blowout:" << s.ToString(); latch.CountDown(); thread_pool->Wait(); thread_pool->Shutdown(); } +// Test that when we specify a zero-sized queue, the maximum number of threads +// running is used for enforcement. +TEST(TestThreadPool, TestZeroQueueSize) { + gscoped_ptr<ThreadPool> thread_pool; + const int kMaxThreads = 4; + ASSERT_OK(ThreadPoolBuilder("test") + .set_max_queue_size(0) + .set_max_threads(kMaxThreads) + .Build(&thread_pool)); + + CountDownLatch latch(1); + for (int i = 0; i < kMaxThreads; i++) { + ASSERT_OK(thread_pool->Submit(shared_ptr<Runnable>(new SlowTask(&latch)))); + } + Status s = thread_pool->Submit(shared_ptr<Runnable>(new SlowTask(&latch))); + ASSERT_TRUE(s.IsServiceUnavailable()) << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "Thread pool is at capacity"); + latch.CountDown(); + thread_pool->Wait(); + thread_pool->Shutdown(); +} + // Test that setting a promise from another thread yields // a value on the current thread. TEST(TestThreadPool, TestPromises) { @@ -252,7 +271,6 @@ TEST(TestThreadPool, TestPromises) { thread_pool->Shutdown(); } - METRIC_DEFINE_entity(test_entity); METRIC_DEFINE_histogram(test_entity, queue_length, "queue length", MetricUnit::kTasks, "queue length", 1000, 1); http://git-wip-us.apache.org/repos/asf/kudu/blob/c52cc163/src/kudu/util/threadpool.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc index 00cd6f8..b2def9c 100644 --- a/src/kudu/util/threadpool.cc +++ b/src/kudu/util/threadpool.cc @@ -82,7 +82,6 @@ ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) { } ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) { - CHECK_GT(max_queue_size, 0); max_queue_size_ = max_queue_size; return *this; } @@ -191,9 +190,12 @@ Status ThreadPool::Submit(const std::shared_ptr<Runnable>& task) { } // Size limit check. - if (queue_size_ == max_queue_size_) { - return Status::ServiceUnavailable(Substitute("Thread pool queue is full ($0 items)", - queue_size_)); + int64_t capacity_remaining = static_cast<int64_t>(max_threads_) - active_threads_ + + static_cast<int64_t>(max_queue_size_) - queue_size_; + if (capacity_remaining < 1) { + return Status::ServiceUnavailable( + Substitute("Thread pool is at capacity ($0/$1 tasks running, $2/$3 tasks queued)", + num_threads_, max_threads_, queue_size_, max_queue_size_)); } // Should we create another thread? @@ -213,12 +215,11 @@ Status ThreadPool::Submit(const std::shared_ptr<Runnable>& task) { if (num_threads_ == 0) { // If we have no threads, we can't do any work. return status; - } else { - // If we failed to create a thread, but there are still some other - // worker threads, log a warning message and continue. - LOG(WARNING) << "Thread pool failed to create thread: " - << status.ToString(); } + // If we failed to create a thread, but there are still some other + // worker threads, log a warning message and continue. + LOG(ERROR) << "Thread pool failed to create thread: " + << status.ToString(); } } @@ -281,7 +282,6 @@ void ThreadPool::SetRunTimeMicrosHistogram(const scoped_refptr<Histogram>& hist) run_time_us_histogram_ = hist; } - void ThreadPool::DispatchThread(bool permanent) { MutexLock unique_lock(lock_); while (true) {
