http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/threadpool-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/threadpool-test.cc b/be/src/kudu/util/threadpool-test.cc new file mode 100644 index 0000000..23fc45c --- /dev/null +++ b/be/src/kudu/util/threadpool-test.cc @@ -0,0 +1,941 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <unistd.h> + +#include <atomic> +#include <cstdint> +#include <iterator> +#include <limits> +#include <memory> +#include <mutex> +#include <ostream> +#include <string> +#include <thread> +#include <utility> +#include <vector> + +#include <boost/bind.hpp> // IWYU pragma: keep +#include <boost/smart_ptr/shared_ptr.hpp> +#include <gflags/gflags_declare.h> +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "kudu/gutil/atomicops.h" +#include "kudu/gutil/bind.h" +#include "kudu/gutil/bind_helpers.h" +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/port.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/sysinfo.h" +#include "kudu/util/barrier.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/locks.h" +#include "kudu/util/metrics.h" +#include "kudu/util/monotime.h" +#include "kudu/util/promise.h" +#include "kudu/util/random.h" +#include "kudu/util/scoped_cleanup.h" +#include "kudu/util/status.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" +#include "kudu/util/threadpool.h" +#include "kudu/util/trace.h" + +using std::atomic; +using std::shared_ptr; +using std::string; +using std::thread; +using std::unique_ptr; +using std::vector; + +using strings::Substitute; + +DECLARE_int32(thread_inject_start_latency_ms); + +namespace kudu { + +static const char* kDefaultPoolName = "test"; + +class ThreadPoolTest : public KuduTest { + public: + + virtual void SetUp() override { + KuduTest::SetUp(); + ASSERT_OK(ThreadPoolBuilder(kDefaultPoolName).Build(&pool_)); + } + + Status RebuildPoolWithBuilder(const ThreadPoolBuilder& builder) { + return builder.Build(&pool_); + } + + Status RebuildPoolWithMinMax(int min_threads, int max_threads) { + return ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(min_threads) + .set_max_threads(max_threads) + .Build(&pool_); + } + + protected: + gscoped_ptr<ThreadPool> pool_; +}; + +TEST_F(ThreadPoolTest, TestNoTaskOpenClose) { + ASSERT_OK(RebuildPoolWithMinMax(4, 4)); + pool_->Shutdown(); +} + +static void SimpleTaskMethod(int n, Atomic32 *counter) { + while (n--) { + base::subtle::NoBarrier_AtomicIncrement(counter, 1); + boost::detail::yield(n); + } +} + +class SimpleTask : public Runnable { + public: + SimpleTask(int n, Atomic32 *counter) + : n_(n), counter_(counter) { + } + + void Run() OVERRIDE { + SimpleTaskMethod(n_, counter_); + } + + private: + int n_; + Atomic32 *counter_; +}; + +TEST_F(ThreadPoolTest, TestSimpleTasks) { + ASSERT_OK(RebuildPoolWithMinMax(4, 4)); + + Atomic32 counter(0); + std::shared_ptr<Runnable> task(new SimpleTask(15, &counter)); + + ASSERT_OK(pool_->SubmitFunc(boost::bind(&SimpleTaskMethod, 10, &counter))); + ASSERT_OK(pool_->Submit(task)); + ASSERT_OK(pool_->SubmitFunc(boost::bind(&SimpleTaskMethod, 20, &counter))); + ASSERT_OK(pool_->Submit(task)); + ASSERT_OK(pool_->SubmitClosure(Bind(&SimpleTaskMethod, 123, &counter))); + pool_->Wait(); + ASSERT_EQ(10 + 15 + 20 + 15 + 123, base::subtle::NoBarrier_Load(&counter)); + pool_->Shutdown(); +} + +static void IssueTraceStatement() { + TRACE("hello from task"); +} + +// Test that the thread-local trace is propagated to tasks +// submitted to the threadpool. +TEST_F(ThreadPoolTest, TestTracePropagation) { + ASSERT_OK(RebuildPoolWithMinMax(1, 1)); + + scoped_refptr<Trace> t(new Trace); + { + ADOPT_TRACE(t.get()); + ASSERT_OK(pool_->SubmitFunc(&IssueTraceStatement)); + } + pool_->Wait(); + ASSERT_STR_CONTAINS(t->DumpToString(), "hello from task"); +} + +TEST_F(ThreadPoolTest, TestSubmitAfterShutdown) { + ASSERT_OK(RebuildPoolWithMinMax(1, 1)); + pool_->Shutdown(); + Status s = pool_->SubmitFunc(&IssueTraceStatement); + ASSERT_EQ("Service unavailable: The pool has been shut down.", + s.ToString()); +} + +class SlowTask : public Runnable { + public: + explicit SlowTask(CountDownLatch* latch) + : latch_(latch) { + } + + void Run() OVERRIDE { + latch_->Wait(); + } + + static shared_ptr<Runnable> NewSlowTask(CountDownLatch* latch) { + return std::make_shared<SlowTask>(latch); + } + + private: + CountDownLatch* latch_; +}; + +TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) { + ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(0) + .set_max_threads(3) + .set_idle_timeout(MonoDelta::FromMilliseconds(1)))); + + // There are no threads to start with. + ASSERT_TRUE(pool_->num_threads() == 0); + // We get up to 3 threads when submitting work. + CountDownLatch latch(1); + SCOPED_CLEANUP({ + latch.CountDown(); + }); + ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch))); + ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch))); + ASSERT_EQ(2, pool_->num_threads()); + ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch))); + ASSERT_EQ(3, pool_->num_threads()); + // The 4th piece of work gets queued. + ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch))); + ASSERT_EQ(3, pool_->num_threads()); + // Finish all work + latch.CountDown(); + pool_->Wait(); + ASSERT_EQ(0, pool_->active_threads_); + pool_->Shutdown(); + ASSERT_EQ(0, pool_->num_threads()); +} + +TEST_F(ThreadPoolTest, TestThreadPoolWithNoMaxThreads) { + // By default a threadpool's max_threads is set to the number of CPUs, so + // this test submits more tasks than that to ensure that the number of CPUs + // isn't some kind of upper bound. + const int kNumCPUs = base::NumCPUs(); + + // Build a threadpool with no limit on the maximum number of threads. + ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) + .set_max_threads(std::numeric_limits<int>::max()))); + CountDownLatch latch(1); + auto cleanup_latch = MakeScopedCleanup([&]() { + latch.CountDown(); + }); + + // Submit tokenless tasks. Each should create a new thread. + for (int i = 0; i < kNumCPUs * 2; i++) { + ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch))); + } + ASSERT_EQ((kNumCPUs * 2), pool_->num_threads()); + + // Submit tasks on two tokens. Only two threads should be created. + unique_ptr<ThreadPoolToken> t1 = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL); + unique_ptr<ThreadPoolToken> t2 = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL); + for (int i = 0; i < kNumCPUs * 2; i++) { + ThreadPoolToken* t = (i % 2 == 0) ? t1.get() : t2.get(); + ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch))); + } + ASSERT_EQ((kNumCPUs * 2) + 2, pool_->num_threads()); + + // Submit more tokenless tasks. Each should create a new thread. + for (int i = 0; i < kNumCPUs; i++) { + ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch))); + } + ASSERT_EQ((kNumCPUs * 3) + 2, pool_->num_threads()); + + latch.CountDown(); + pool_->Wait(); + pool_->Shutdown(); +} + +// Regression test for a bug where a task is submitted exactly +// as a thread is about to exit. Previously this could hang forever. +TEST_F(ThreadPoolTest, TestRace) { + alarm(60); + auto cleanup = MakeScopedCleanup([]() { + alarm(0); // Disable alarm on test exit. + }); + ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(0) + .set_max_threads(1) + .set_idle_timeout(MonoDelta::FromMicroseconds(1)))); + + for (int i = 0; i < 500; i++) { + CountDownLatch l(1); + ASSERT_OK(pool_->SubmitFunc(boost::bind(&CountDownLatch::CountDown, &l))); + l.Wait(); + // Sleeping a different amount in each iteration makes it more likely to hit + // the bug. + SleepFor(MonoDelta::FromMicroseconds(i)); + } +} + +TEST_F(ThreadPoolTest, TestVariableSizeThreadPool) { + ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(1) + .set_max_threads(4) + .set_idle_timeout(MonoDelta::FromMilliseconds(1)))); + + // There is 1 thread to start with. + ASSERT_EQ(1, pool_->num_threads()); + // We get up to 4 threads when submitting work. + CountDownLatch latch(1); + ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch))); + ASSERT_EQ(1, pool_->num_threads()); + ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch))); + ASSERT_EQ(2, pool_->num_threads()); + ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch))); + ASSERT_EQ(3, pool_->num_threads()); + ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch))); + ASSERT_EQ(4, pool_->num_threads()); + // The 5th piece of work gets queued. + ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch))); + ASSERT_EQ(4, pool_->num_threads()); + // Finish all work + latch.CountDown(); + pool_->Wait(); + ASSERT_EQ(0, pool_->active_threads_); + pool_->Shutdown(); + ASSERT_EQ(0, pool_->num_threads()); +} + +TEST_F(ThreadPoolTest, TestMaxQueueSize) { + ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(1) + .set_max_threads(1) + .set_max_queue_size(1))); + + CountDownLatch latch(1); + // We will be able to submit two tasks: one for max_threads == 1 and one for + // max_queue_size == 1. + ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch))); + ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch))); + Status s = pool_->Submit(SlowTask::NewSlowTask(&latch)); + CHECK(s.IsServiceUnavailable()) << "Expected failure due to queue blowout:" << s.ToString(); + latch.CountDown(); + pool_->Wait(); + pool_->Shutdown(); +} + +// Test that when we specify a zero-sized queue, the maximum number of threads +// running is used for enforcement. +TEST_F(ThreadPoolTest, TestZeroQueueSize) { + const int kMaxThreads = 4; + ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) + .set_max_queue_size(0) + .set_max_threads(kMaxThreads))); + + CountDownLatch latch(1); + for (int i = 0; i < kMaxThreads; i++) { + ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch))); + } + Status s = pool_->Submit(SlowTask::NewSlowTask(&latch)); + ASSERT_TRUE(s.IsServiceUnavailable()) << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "Thread pool is at capacity"); + latch.CountDown(); + pool_->Wait(); + pool_->Shutdown(); +} + +// Regression test for KUDU-2187: +// +// If a threadpool thread is slow to start up, it shouldn't block progress of +// other tasks on the same pool. +TEST_F(ThreadPoolTest, TestSlowThreadStart) { + // Start a pool of threads from which we'll submit tasks. + gscoped_ptr<ThreadPool> submitter_pool; + ASSERT_OK(ThreadPoolBuilder("submitter") + .set_min_threads(5) + .set_max_threads(5) + .Build(&submitter_pool)); + + // Start the actual test pool, which starts with one thread + // but will start a second one on-demand. + ASSERT_OK(RebuildPoolWithMinMax(1, 2)); + // Ensure that the second thread will take a long time to start. + FLAGS_thread_inject_start_latency_ms = 3000; + + // Now submit 10 tasks to the 'submitter' pool, each of which + // submits a single task to 'pool_'. The 'pool_' task sleeps + // for 10ms. + // + // Because the 'submitter' tasks submit faster than they can be + // processed on a single thread (due to the sleep), we expect that + // this will trigger 'pool_' to start up its second worker thread. + // The thread startup will have some latency injected. + // + // We expect that the thread startup will block only one of the + // tasks in the 'submitter' pool after it submits its task. Other + // tasks will continue to be processed by the other (already-running) + // thread on 'pool_'. + std::atomic<int32_t> total_queue_time_ms(0); + for (int i = 0; i < 10; i++) { + ASSERT_OK(submitter_pool->SubmitFunc([&]() { + auto submit_time = MonoTime::Now(); + CHECK_OK(pool_->SubmitFunc([&,submit_time]() { + auto queue_time = MonoTime::Now() - submit_time; + total_queue_time_ms += queue_time.ToMilliseconds(); + SleepFor(MonoDelta::FromMilliseconds(10)); + })); + })); + } + submitter_pool->Wait(); + pool_->Wait(); + + // Since the total amount of work submitted was only 100ms, we expect + // that the performance would be equivalent to a single-threaded + // threadpool. So, we expect the total queue time to be approximately + // 0 + 10 + 20 ... + 80 + 90 = 450ms. + // + // If, instead, throughput had been blocked while starting threads, + // we'd get something closer to 18000ms (3000ms delay * 5 submitter threads). + ASSERT_GE(total_queue_time_ms, 400); + ASSERT_LE(total_queue_time_ms, 10000); +} + +// Test that setting a promise from another thread yields +// a value on the current thread. +TEST_F(ThreadPoolTest, TestPromises) { + ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(1) + .set_max_threads(1) + .set_max_queue_size(1))); + + Promise<int> my_promise; + ASSERT_OK(pool_->SubmitClosure( + Bind(&Promise<int>::Set, Unretained(&my_promise), 5))); + ASSERT_EQ(5, my_promise.Get()); + pool_->Shutdown(); +} + +METRIC_DEFINE_entity(test_entity); +METRIC_DEFINE_histogram(test_entity, queue_length, "queue length", + MetricUnit::kTasks, "queue length", 1000, 1); + +METRIC_DEFINE_histogram(test_entity, queue_time, "queue time", + MetricUnit::kMicroseconds, "queue time", 1000000, 1); + +METRIC_DEFINE_histogram(test_entity, run_time, "run time", + MetricUnit::kMicroseconds, "run time", 1000, 1); + +TEST_F(ThreadPoolTest, TestMetrics) { + MetricRegistry registry; + vector<ThreadPoolMetrics> all_metrics; + for (int i = 0; i < 3; i++) { + scoped_refptr<MetricEntity> entity = METRIC_ENTITY_test_entity.Instantiate( + ®istry, Substitute("test $0", i)); + all_metrics.emplace_back(ThreadPoolMetrics{ + METRIC_queue_length.Instantiate(entity), + METRIC_queue_time.Instantiate(entity), + METRIC_run_time.Instantiate(entity) + }); + } + + // Enable metrics for the thread pool. + ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(1) + .set_max_threads(1) + .set_metrics(all_metrics[0]))); + + unique_ptr<ThreadPoolToken> t1 = pool_->NewTokenWithMetrics( + ThreadPool::ExecutionMode::SERIAL, all_metrics[1]); + unique_ptr<ThreadPoolToken> t2 = pool_->NewTokenWithMetrics( + ThreadPool::ExecutionMode::SERIAL, all_metrics[2]); + + // Submit once to t1, twice to t2, and three times without a token. + ASSERT_OK(t1->SubmitFunc([](){})); + ASSERT_OK(t2->SubmitFunc([](){})); + ASSERT_OK(t2->SubmitFunc([](){})); + ASSERT_OK(pool_->SubmitFunc([](){})); + ASSERT_OK(pool_->SubmitFunc([](){})); + ASSERT_OK(pool_->SubmitFunc([](){})); + pool_->Wait(); + + // The total counts should reflect the number of submissions to each token. + ASSERT_EQ(1, all_metrics[1].queue_length_histogram->TotalCount()); + ASSERT_EQ(1, all_metrics[1].queue_time_us_histogram->TotalCount()); + ASSERT_EQ(1, all_metrics[1].run_time_us_histogram->TotalCount()); + ASSERT_EQ(2, all_metrics[2].queue_length_histogram->TotalCount()); + ASSERT_EQ(2, all_metrics[2].queue_time_us_histogram->TotalCount()); + ASSERT_EQ(2, all_metrics[2].run_time_us_histogram->TotalCount()); + + // And the counts on the pool-wide metrics should reflect all submissions. + ASSERT_EQ(6, all_metrics[0].queue_length_histogram->TotalCount()); + ASSERT_EQ(6, all_metrics[0].queue_time_us_histogram->TotalCount()); + ASSERT_EQ(6, all_metrics[0].run_time_us_histogram->TotalCount()); +} + +// Test that a thread pool will crash if asked to run its own blocking +// functions in a pool thread. +// +// In a multi-threaded application, TSAN is unsafe to use following a fork(). +// After a fork(), TSAN will: +// 1. Disable verification, expecting an exec() soon anyway, and +// 2. Die on future thread creation. +// For some reason, this test triggers behavior #2. We could disable it with +// the TSAN option die_after_fork=0, but this can (supposedly) lead to +// deadlocks, so we'll disable the entire test instead. +#ifndef THREAD_SANITIZER +TEST_F(ThreadPoolTest, TestDeadlocks) { + const char* death_msg = "called pool function that would result in deadlock"; + ASSERT_DEATH({ + ASSERT_OK(RebuildPoolWithMinMax(1, 1)); + ASSERT_OK(pool_->SubmitClosure( + Bind(&ThreadPool::Shutdown, Unretained(pool_.get())))); + pool_->Wait(); + }, death_msg); + + ASSERT_DEATH({ + ASSERT_OK(RebuildPoolWithMinMax(1, 1)); + ASSERT_OK(pool_->SubmitClosure( + Bind(&ThreadPool::Wait, Unretained(pool_.get())))); + pool_->Wait(); + }, death_msg); +} +#endif + +class SlowDestructorRunnable : public Runnable { + public: + void Run() override {} + + virtual ~SlowDestructorRunnable() { + SleepFor(MonoDelta::FromMilliseconds(100)); + } +}; + +// Test that if a tasks's destructor is slow, it doesn't cause serialization of the tasks +// in the queue. +TEST_F(ThreadPoolTest, TestSlowDestructor) { + ASSERT_OK(RebuildPoolWithMinMax(1, 20)); + MonoTime start = MonoTime::Now(); + for (int i = 0; i < 100; i++) { + shared_ptr<Runnable> task(new SlowDestructorRunnable()); + ASSERT_OK(pool_->Submit(std::move(task))); + } + pool_->Wait(); + ASSERT_LT((MonoTime::Now() - start).ToSeconds(), 5); +} + +// For test cases that should run with both kinds of tokens. +class ThreadPoolTestTokenTypes : public ThreadPoolTest, + public testing::WithParamInterface<ThreadPool::ExecutionMode> {}; + +INSTANTIATE_TEST_CASE_P(Tokens, ThreadPoolTestTokenTypes, + ::testing::Values(ThreadPool::ExecutionMode::SERIAL, + ThreadPool::ExecutionMode::CONCURRENT)); + + +TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmitAndWait) { + unique_ptr<ThreadPoolToken> t = pool_->NewToken(GetParam()); + int i = 0; + ASSERT_OK(t->SubmitFunc([&]() { + SleepFor(MonoDelta::FromMilliseconds(1)); + i++; + })); + t->Wait(); + ASSERT_EQ(1, i); +} + +TEST_F(ThreadPoolTest, TestTokenSubmitsProcessedSerially) { + unique_ptr<ThreadPoolToken> t = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL); + Random r(SeedRandom()); + string result; + for (char c = 'a'; c < 'f'; c++) { + // Sleep a little first so that there's a higher chance of out-of-order + // appends if the submissions did execute in parallel. + int sleep_ms = r.Next() % 5; + ASSERT_OK(t->SubmitFunc([&result, c, sleep_ms]() { + SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); + result += c; + })); + } + t->Wait(); + ASSERT_EQ("abcde", result); +} + +TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmitsProcessedConcurrently) { + const int kNumTokens = 5; + ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) + .set_max_threads(kNumTokens))); + vector<unique_ptr<ThreadPoolToken>> tokens; + + // A violation to the tested invariant would yield a deadlock, so let's set + // up an alarm to bail us out. + alarm(60); + SCOPED_CLEANUP({ + alarm(0); // Disable alarm on test exit. + }); + shared_ptr<Barrier> b = std::make_shared<Barrier>(kNumTokens + 1); + for (int i = 0; i < kNumTokens; i++) { + tokens.emplace_back(pool_->NewToken(GetParam())); + ASSERT_OK(tokens.back()->SubmitFunc([b]() { + b->Wait(); + })); + } + + // This will deadlock if the above tasks weren't all running concurrently. + b->Wait(); +} + +TEST_F(ThreadPoolTest, TestTokenSubmitsNonSequential) { + const int kNumSubmissions = 5; + ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) + .set_max_threads(kNumSubmissions))); + + // A violation to the tested invariant would yield a deadlock, so let's set + // up an alarm to bail us out. + alarm(60); + SCOPED_CLEANUP({ + alarm(0); // Disable alarm on test exit. + }); + shared_ptr<Barrier> b = std::make_shared<Barrier>(kNumSubmissions + 1); + unique_ptr<ThreadPoolToken> t = pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT); + for (int i = 0; i < kNumSubmissions; i++) { + ASSERT_OK(t->SubmitFunc([b]() { + b->Wait(); + })); + } + + // This will deadlock if the above tasks weren't all running concurrently. + b->Wait(); +} + +TEST_P(ThreadPoolTestTokenTypes, TestTokenShutdown) { + ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) + .set_max_threads(4))); + + unique_ptr<ThreadPoolToken> t1(pool_->NewToken(GetParam())); + unique_ptr<ThreadPoolToken> t2(pool_->NewToken(GetParam())); + CountDownLatch l1(1); + CountDownLatch l2(1); + + // A violation to the tested invariant would yield a deadlock, so let's set + // up an alarm to bail us out. + alarm(60); + SCOPED_CLEANUP({ + alarm(0); // Disable alarm on test exit. + }); + + for (int i = 0; i < 3; i++) { + ASSERT_OK(t1->SubmitFunc([&]() { + l1.Wait(); + })); + } + for (int i = 0; i < 3; i++) { + ASSERT_OK(t2->SubmitFunc([&]() { + l2.Wait(); + })); + } + + // Unblock all of t1's tasks, but not t2's tasks. + l1.CountDown(); + + // If this also waited for t2's tasks, it would deadlock. + t1->Shutdown(); + + // We can no longer submit to t1 but we can still submit to t2. + ASSERT_TRUE(t1->SubmitFunc([](){}).IsServiceUnavailable()); + ASSERT_OK(t2->SubmitFunc([](){})); + + // Unblock t2's tasks. + l2.CountDown(); + t2->Shutdown(); +} + +TEST_P(ThreadPoolTestTokenTypes, TestTokenWaitForAll) { + const int kNumTokens = 3; + const int kNumSubmissions = 20; + Random r(SeedRandom()); + vector<unique_ptr<ThreadPoolToken>> tokens; + for (int i = 0; i < kNumTokens; i++) { + tokens.emplace_back(pool_->NewToken(GetParam())); + } + + atomic<int32_t> v(0); + for (int i = 0; i < kNumSubmissions; i++) { + // Sleep a little first to raise the likelihood of the test thread + // reaching Wait() before the submissions finish. + int sleep_ms = r.Next() % 5; + + auto task = [&v, sleep_ms]() { + SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); + v++; + }; + + // Half of the submissions will be token-less, and half will use a token. + if (i % 2 == 0) { + ASSERT_OK(pool_->SubmitFunc(task)); + } else { + int token_idx = r.Next() % tokens.size(); + ASSERT_OK(tokens[token_idx]->SubmitFunc(task)); + } + } + pool_->Wait(); + ASSERT_EQ(kNumSubmissions, v); +} + +TEST_F(ThreadPoolTest, TestFuzz) { + const int kNumOperations = 1000; + Random r(SeedRandom()); + vector<unique_ptr<ThreadPoolToken>> tokens; + + for (int i = 0; i < kNumOperations; i++) { + // Operation distribution: + // + // - Submit without a token: 40% + // - Submit with a randomly selected token: 35% + // - Allocate a new token: 10% + // - Wait on a randomly selected token: 7% + // - Shutdown a randomly selected token: 4% + // - Deallocate a randomly selected token: 2% + // - Wait for all submissions: 2% + int op = r.Next() % 100; + if (op < 40) { + // Submit without a token. + int sleep_ms = r.Next() % 5; + ASSERT_OK(pool_->SubmitFunc([sleep_ms]() { + // Sleep a little first to increase task overlap. + SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); + })); + } else if (op < 75) { + // Submit with a randomly selected token. + if (tokens.empty()) { + continue; + } + int sleep_ms = r.Next() % 5; + int token_idx = r.Next() % tokens.size(); + Status s = tokens[token_idx]->SubmitFunc([sleep_ms]() { + // Sleep a little first to increase task overlap. + SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); + }); + ASSERT_TRUE(s.ok() || s.IsServiceUnavailable()); + } else if (op < 85) { + // Allocate a token with a randomly selected policy. + ThreadPool::ExecutionMode mode = r.Next() % 2 ? + ThreadPool::ExecutionMode::SERIAL : + ThreadPool::ExecutionMode::CONCURRENT; + tokens.emplace_back(pool_->NewToken(mode)); + } else if (op < 92) { + // Wait on a randomly selected token. + if (tokens.empty()) { + continue; + } + int token_idx = r.Next() % tokens.size(); + tokens[token_idx]->Wait(); + } else if (op < 96) { + // Shutdown a randomly selected token. + if (tokens.empty()) { + continue; + } + int token_idx = r.Next() % tokens.size(); + tokens[token_idx]->Shutdown(); + } else if (op < 98) { + // Deallocate a randomly selected token. + if (tokens.empty()) { + continue; + } + auto it = tokens.begin(); + int token_idx = r.Next() % tokens.size(); + std::advance(it, token_idx); + tokens.erase(it); + } else { + // Wait on everything. + ASSERT_LT(op, 100); + ASSERT_GE(op, 98); + pool_->Wait(); + } + } + + // Some test runs will shut down the pool before the tokens, and some won't. + // Either way should be safe. + if (r.Next() % 2 == 0) { + pool_->Shutdown(); + } +} + +TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmissionsAdhereToMaxQueueSize) { + ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(1) + .set_max_threads(1) + .set_max_queue_size(1))); + + CountDownLatch latch(1); + unique_ptr<ThreadPoolToken> t = pool_->NewToken(GetParam()); + SCOPED_CLEANUP({ + latch.CountDown(); + }); + // We will be able to submit two tasks: one for max_threads == 1 and one for + // max_queue_size == 1. + ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch))); + ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch))); + Status s = t->Submit(SlowTask::NewSlowTask(&latch)); + ASSERT_TRUE(s.IsServiceUnavailable()); +} + +TEST_F(ThreadPoolTest, TestTokenConcurrency) { + const int kNumTokens = 20; + const int kTestRuntimeSecs = 1; + const int kCycleThreads = 2; + const int kShutdownThreads = 2; + const int kWaitThreads = 2; + const int kSubmitThreads = 8; + + vector<shared_ptr<ThreadPoolToken>> tokens; + Random rng(SeedRandom()); + + // Protects 'tokens' and 'rng'. + simple_spinlock lock; + + // Fetch a token from 'tokens' at random. + auto GetRandomToken = [&]() -> shared_ptr<ThreadPoolToken> { + std::lock_guard<simple_spinlock> l(lock); + int idx = rng.Uniform(kNumTokens); + return tokens[idx]; + }; + + // Preallocate all of the tokens. + for (int i = 0; i < kNumTokens; i++) { + ThreadPool::ExecutionMode mode; + { + std::lock_guard<simple_spinlock> l(lock); + mode = rng.Next() % 2 ? + ThreadPool::ExecutionMode::SERIAL : + ThreadPool::ExecutionMode::CONCURRENT; + } + tokens.emplace_back(pool_->NewToken(mode).release()); + } + + atomic<int64_t> total_num_tokens_cycled(0); + atomic<int64_t> total_num_tokens_shutdown(0); + atomic<int64_t> total_num_tokens_waited(0); + atomic<int64_t> total_num_tokens_submitted(0); + + CountDownLatch latch(1); + vector<thread> threads; + + for (int i = 0; i < kCycleThreads; i++) { + // Pick a token at random and replace it. + // + // The replaced token is only destroyed when the last ref is dropped, + // possibly by another thread. + threads.emplace_back([&]() { + int num_tokens_cycled = 0; + while (latch.count()) { + { + std::lock_guard<simple_spinlock> l(lock); + int idx = rng.Uniform(kNumTokens); + ThreadPool::ExecutionMode mode = rng.Next() % 2 ? + ThreadPool::ExecutionMode::SERIAL : + ThreadPool::ExecutionMode::CONCURRENT; + tokens[idx] = shared_ptr<ThreadPoolToken>(pool_->NewToken(mode).release()); + } + num_tokens_cycled++; + + // Sleep a bit, otherwise this thread outpaces the other threads and + // nothing interesting happens to most tokens. + SleepFor(MonoDelta::FromMicroseconds(10)); + } + total_num_tokens_cycled += num_tokens_cycled; + }); + } + + for (int i = 0; i < kShutdownThreads; i++) { + // Pick a token at random and shut it down. Submitting a task to a shut + // down token will return a ServiceUnavailable error. + threads.emplace_back([&]() { + int num_tokens_shutdown = 0; + while (latch.count()) { + GetRandomToken()->Shutdown(); + num_tokens_shutdown++; + } + total_num_tokens_shutdown += num_tokens_shutdown; + }); + } + + for (int i = 0; i < kWaitThreads; i++) { + // Pick a token at random and wait for any outstanding tasks. + threads.emplace_back([&]() { + int num_tokens_waited = 0; + while (latch.count()) { + GetRandomToken()->Wait(); + num_tokens_waited++; + } + total_num_tokens_waited += num_tokens_waited; + }); + } + + for (int i = 0; i < kSubmitThreads; i++) { + // Pick a token at random and submit a task to it. + threads.emplace_back([&]() { + int num_tokens_submitted = 0; + Random rng(SeedRandom()); + while (latch.count()) { + int sleep_ms = rng.Next() % 5; + Status s = GetRandomToken()->SubmitFunc([sleep_ms]() { + // Sleep a little first so that tasks are running during other events. + SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); + }); + CHECK(s.ok() || s.IsServiceUnavailable()); + num_tokens_submitted++; + } + total_num_tokens_submitted += num_tokens_submitted; + }); + } + + SleepFor(MonoDelta::FromSeconds(kTestRuntimeSecs)); + latch.CountDown(); + for (auto& t : threads) { + t.join(); + } + + LOG(INFO) << Substitute("Tokens cycled ($0 threads): $1", + kCycleThreads, total_num_tokens_cycled.load()); + LOG(INFO) << Substitute("Tokens shutdown ($0 threads): $1", + kShutdownThreads, total_num_tokens_shutdown.load()); + LOG(INFO) << Substitute("Tokens waited ($0 threads): $1", + kWaitThreads, total_num_tokens_waited.load()); + LOG(INFO) << Substitute("Tokens submitted ($0 threads): $1", + kSubmitThreads, total_num_tokens_submitted.load()); +} + +TEST_F(ThreadPoolTest, TestLIFOThreadWakeUps) { + const int kNumThreads = 10; + + // Test with a pool that allows for kNumThreads concurrent threads. + ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) + .set_max_threads(kNumThreads))); + + // Submit kNumThreads slow tasks and unblock them, in order to produce + // kNumThreads worker threads. + CountDownLatch latch(1); + SCOPED_CLEANUP({ + latch.CountDown(); + }); + for (int i = 0; i < kNumThreads; i++) { + ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch))); + } + ASSERT_EQ(kNumThreads, pool_->num_threads()); + latch.CountDown(); + pool_->Wait(); + + // The kNumThreads threads are idle and waiting for the idle timeout. + + // Submit a slow trickle of lightning fast tasks. + // + // If the threads are woken up in FIFO order, this trickle is enough to + // prevent all of them from idling and the AssertEventually will time out. + // + // If LIFO order is used, the same thread will be reused for each task and + // the other threads will eventually time out. + AssertEventually([&]() { + ASSERT_OK(pool_->SubmitFunc([](){})); + SleepFor(MonoDelta::FromMilliseconds(10)); + ASSERT_EQ(1, pool_->num_threads()); + }, MonoDelta::FromSeconds(10), AssertBackoff::NONE); + NO_PENDING_FATALS(); +} + +} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/threadpool.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/threadpool.cc b/be/src/kudu/util/threadpool.cc new file mode 100644 index 0000000..23dda3d --- /dev/null +++ b/be/src/kudu/util/threadpool.cc @@ -0,0 +1,766 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/threadpool.h" + +#include <cstdint> +#include <deque> +#include <limits> +#include <memory> +#include <ostream> +#include <string> +#include <utility> + +#include <boost/function.hpp> // IWYU pragma: keep +#include <glog/logging.h> + +#include "kudu/gutil/callback.h" +#include "kudu/gutil/macros.h" +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/sysinfo.h" +#include "kudu/gutil/walltime.h" +#include "kudu/util/metrics.h" +#include "kudu/util/scoped_cleanup.h" +#include "kudu/util/thread.h" +#include "kudu/util/trace.h" +#include "kudu/util/trace_metrics.h" + +namespace kudu { + +using std::deque; +using std::shared_ptr; +using std::string; +using std::unique_ptr; +using strings::Substitute; + +//////////////////////////////////////////////////////// +// FunctionRunnable +//////////////////////////////////////////////////////// + +class FunctionRunnable : public Runnable { + public: + explicit FunctionRunnable(boost::function<void()> func) : func_(std::move(func)) {} + + void Run() OVERRIDE { + func_(); + } + + private: + boost::function<void()> func_; +}; + +//////////////////////////////////////////////////////// +// ClosureRunnable +//////////////////////////////////////////////////////// + +class ClosureRunnable : public Runnable { + public: + explicit ClosureRunnable(Closure cl) : cl_(std::move(cl)) {} + + void Run() OVERRIDE { + cl_.Run(); + } + + private: + Closure cl_; +}; + +//////////////////////////////////////////////////////// +// ThreadPoolBuilder +//////////////////////////////////////////////////////// + +ThreadPoolBuilder::ThreadPoolBuilder(string name) + : name_(std::move(name)), + min_threads_(0), + max_threads_(base::NumCPUs()), + max_queue_size_(std::numeric_limits<int>::max()), + idle_timeout_(MonoDelta::FromMilliseconds(500)) {} + +ThreadPoolBuilder& ThreadPoolBuilder::set_trace_metric_prefix(const string& prefix) { + trace_metric_prefix_ = prefix; + return *this; +} + +ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) { + CHECK_GE(min_threads, 0); + min_threads_ = min_threads; + return *this; +} + +ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) { + CHECK_GT(max_threads, 0); + max_threads_ = max_threads; + return *this; +} + +ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) { + max_queue_size_ = max_queue_size; + return *this; +} + +ThreadPoolBuilder& ThreadPoolBuilder::set_idle_timeout(const MonoDelta& idle_timeout) { + idle_timeout_ = idle_timeout; + return *this; +} + +ThreadPoolBuilder& ThreadPoolBuilder::set_metrics(ThreadPoolMetrics metrics) { + metrics_ = std::move(metrics); + return *this; +} + +Status ThreadPoolBuilder::Build(gscoped_ptr<ThreadPool>* pool) const { + pool->reset(new ThreadPool(*this)); + RETURN_NOT_OK((*pool)->Init()); + return Status::OK(); +} + +//////////////////////////////////////////////////////// +// ThreadPoolToken +//////////////////////////////////////////////////////// + +ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, + ThreadPool::ExecutionMode mode, + ThreadPoolMetrics metrics) + : mode_(mode), + metrics_(std::move(metrics)), + pool_(pool), + state_(State::IDLE), + not_running_cond_(&pool->lock_), + active_threads_(0) { +} + +ThreadPoolToken::~ThreadPoolToken() { + Shutdown(); + pool_->ReleaseToken(this); +} + +Status ThreadPoolToken::SubmitClosure(Closure c) { + return Submit(std::make_shared<ClosureRunnable>(std::move(c))); +} + +Status ThreadPoolToken::SubmitFunc(boost::function<void()> f) { + return Submit(std::make_shared<FunctionRunnable>(std::move(f))); +} + +Status ThreadPoolToken::Submit(shared_ptr<Runnable> r) { + return pool_->DoSubmit(std::move(r), this); +} + +void ThreadPoolToken::Shutdown() { + MutexLock unique_lock(pool_->lock_); + pool_->CheckNotPoolThreadUnlocked(); + + // Clear the queue under the lock, but defer the releasing of the tasks + // outside the lock, in case there are concurrent threads wanting to access + // the ThreadPool. The task's destructors may acquire locks, etc, so this + // also prevents lock inversions. + std::deque<ThreadPool::Task> to_release = std::move(entries_); + pool_->total_queued_tasks_ -= to_release.size(); + + switch (state()) { + case State::IDLE: + // There were no tasks outstanding; we can quiesce the token immediately. + Transition(State::QUIESCED); + break; + case State::RUNNING: + // There were outstanding tasks. If any are still running, switch to + // QUIESCING and wait for them to finish (the worker thread executing + // the token's last task will switch the token to QUIESCED). Otherwise, + // we can quiesce the token immediately. + + // Note: this is an O(n) operation, but it's expected to be infrequent. + // Plus doing it this way (rather than switching to QUIESCING and waiting + // for a worker thread to process the queue entry) helps retain state + // transition symmetry with ThreadPool::Shutdown. + for (auto it = pool_->queue_.begin(); it != pool_->queue_.end();) { + if (*it == this) { + it = pool_->queue_.erase(it); + } else { + it++; + } + } + + if (active_threads_ == 0) { + Transition(State::QUIESCED); + break; + } + Transition(State::QUIESCING); + FALLTHROUGH_INTENDED; + case State::QUIESCING: + // The token is already quiescing. Just wait for a worker thread to + // switch it to QUIESCED. + while (state() != State::QUIESCED) { + not_running_cond_.Wait(); + } + break; + default: + break; + } + + // Finally release the queued tasks, outside the lock. + unique_lock.Unlock(); + for (auto& t : to_release) { + if (t.trace) { + t.trace->Release(); + } + } +} + +void ThreadPoolToken::Wait() { + MutexLock unique_lock(pool_->lock_); + pool_->CheckNotPoolThreadUnlocked(); + while (IsActive()) { + not_running_cond_.Wait(); + } +} + +bool ThreadPoolToken::WaitUntil(const MonoTime& until) { + MutexLock unique_lock(pool_->lock_); + pool_->CheckNotPoolThreadUnlocked(); + while (IsActive()) { + if (!not_running_cond_.WaitUntil(until)) { + return false; + } + } + return true; +} + +bool ThreadPoolToken::WaitFor(const MonoDelta& delta) { + return WaitUntil(MonoTime::Now() + delta); +} + +void ThreadPoolToken::Transition(State new_state) { +#ifndef NDEBUG + CHECK_NE(state_, new_state); + + switch (state_) { + case State::IDLE: + CHECK(new_state == State::RUNNING || + new_state == State::QUIESCED); + if (new_state == State::RUNNING) { + CHECK(!entries_.empty()); + } else { + CHECK(entries_.empty()); + CHECK_EQ(active_threads_, 0); + } + break; + case State::RUNNING: + CHECK(new_state == State::IDLE || + new_state == State::QUIESCING || + new_state == State::QUIESCED); + CHECK(entries_.empty()); + if (new_state == State::QUIESCING) { + CHECK_GT(active_threads_, 0); + } + break; + case State::QUIESCING: + CHECK(new_state == State::QUIESCED); + CHECK_EQ(active_threads_, 0); + break; + case State::QUIESCED: + CHECK(false); // QUIESCED is a terminal state + break; + default: + LOG(FATAL) << "Unknown token state: " << state_; + } +#endif + + // Take actions based on the state we're entering. + switch (new_state) { + case State::IDLE: + case State::QUIESCED: + not_running_cond_.Broadcast(); + break; + default: + break; + } + + state_ = new_state; +} + +const char* ThreadPoolToken::StateToString(State s) { + switch (s) { + case State::IDLE: return "IDLE"; break; + case State::RUNNING: return "RUNNING"; break; + case State::QUIESCING: return "QUIESCING"; break; + case State::QUIESCED: return "QUIESCED"; break; + } + return "<cannot reach here>"; +} + +//////////////////////////////////////////////////////// +// ThreadPool +//////////////////////////////////////////////////////// + +ThreadPool::ThreadPool(const ThreadPoolBuilder& builder) + : name_(builder.name_), + min_threads_(builder.min_threads_), + max_threads_(builder.max_threads_), + max_queue_size_(builder.max_queue_size_), + idle_timeout_(builder.idle_timeout_), + pool_status_(Status::Uninitialized("The pool was not initialized.")), + idle_cond_(&lock_), + no_threads_cond_(&lock_), + num_threads_(0), + num_threads_pending_start_(0), + active_threads_(0), + total_queued_tasks_(0), + tokenless_(NewToken(ExecutionMode::CONCURRENT)), + metrics_(builder.metrics_) { + string prefix = !builder.trace_metric_prefix_.empty() ? + builder.trace_metric_prefix_ : builder.name_; + + queue_time_trace_metric_name_ = TraceMetrics::InternName( + prefix + ".queue_time_us"); + run_wall_time_trace_metric_name_ = TraceMetrics::InternName( + prefix + ".run_wall_time_us"); + run_cpu_time_trace_metric_name_ = TraceMetrics::InternName( + prefix + ".run_cpu_time_us"); +} + +ThreadPool::~ThreadPool() { + // There should only be one live token: the one used in tokenless submission. + CHECK_EQ(1, tokens_.size()) << Substitute( + "Threadpool $0 destroyed with $1 allocated tokens", + name_, tokens_.size()); + Shutdown(); +} + +Status ThreadPool::Init() { + if (!pool_status_.IsUninitialized()) { + return Status::NotSupported("The thread pool is already initialized"); + } + pool_status_ = Status::OK(); + num_threads_pending_start_ = min_threads_; + for (int i = 0; i < min_threads_; i++) { + Status status = CreateThread(); + if (!status.ok()) { + Shutdown(); + return status; + } + } + return Status::OK(); +} + +void ThreadPool::Shutdown() { + MutexLock unique_lock(lock_); + CheckNotPoolThreadUnlocked(); + + // Note: this is the same error seen at submission if the pool is at + // capacity, so clients can't tell them apart. This isn't really a practical + // concern though because shutting down a pool typically requires clients to + // be quiesced first, so there's no danger of a client getting confused. + pool_status_ = Status::ServiceUnavailable("The pool has been shut down."); + + // Clear the various queues under the lock, but defer the releasing + // of the tasks outside the lock, in case there are concurrent threads + // wanting to access the ThreadPool. The task's destructors may acquire + // locks, etc, so this also prevents lock inversions. + queue_.clear(); + std::deque<std::deque<Task>> to_release; + for (auto* t : tokens_) { + if (!t->entries_.empty()) { + to_release.emplace_back(std::move(t->entries_)); + } + switch (t->state()) { + case ThreadPoolToken::State::IDLE: + // The token is idle; we can quiesce it immediately. + t->Transition(ThreadPoolToken::State::QUIESCED); + break; + case ThreadPoolToken::State::RUNNING: + // The token has tasks associated with it. If they're merely queued + // (i.e. there are no active threads), the tasks will have been removed + // above and we can quiesce immediately. Otherwise, we need to wait for + // the threads to finish. + t->Transition(t->active_threads_ > 0 ? + ThreadPoolToken::State::QUIESCING : + ThreadPoolToken::State::QUIESCED); + break; + default: + break; + } + } + + // The queues are empty. Wake any sleeping worker threads and wait for all + // of them to exit. Some worker threads will exit immediately upon waking, + // while others will exit after they finish executing an outstanding task. + total_queued_tasks_ = 0; + while (!idle_threads_.empty()) { + idle_threads_.front().not_empty.Signal(); + idle_threads_.pop_front(); + } + while (num_threads_ + num_threads_pending_start_ > 0) { + no_threads_cond_.Wait(); + } + + // All the threads have exited. Check the state of each token. + for (auto* t : tokens_) { + DCHECK(t->state() == ThreadPoolToken::State::IDLE || + t->state() == ThreadPoolToken::State::QUIESCED); + } + + // Finally release the queued tasks, outside the lock. + unique_lock.Unlock(); + for (auto& token : to_release) { + for (auto& t : token) { + if (t.trace) { + t.trace->Release(); + } + } + } +} + +unique_ptr<ThreadPoolToken> ThreadPool::NewToken(ExecutionMode mode) { + return NewTokenWithMetrics(mode, {}); +} + +unique_ptr<ThreadPoolToken> ThreadPool::NewTokenWithMetrics( + ExecutionMode mode, ThreadPoolMetrics metrics) { + MutexLock guard(lock_); + unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, + mode, + std::move(metrics))); + InsertOrDie(&tokens_, t.get()); + return t; +} + +void ThreadPool::ReleaseToken(ThreadPoolToken* t) { + MutexLock guard(lock_); + CHECK(!t->IsActive()) << Substitute("Token with state $0 may not be released", + ThreadPoolToken::StateToString(t->state())); + CHECK_EQ(1, tokens_.erase(t)); +} + +Status ThreadPool::SubmitClosure(Closure c) { + return Submit(std::make_shared<ClosureRunnable>(std::move(c))); +} + +Status ThreadPool::SubmitFunc(boost::function<void()> f) { + return Submit(std::make_shared<FunctionRunnable>(std::move(f))); +} + +Status ThreadPool::Submit(shared_ptr<Runnable> r) { + return DoSubmit(std::move(r), tokenless_.get()); +} + +Status ThreadPool::DoSubmit(shared_ptr<Runnable> r, ThreadPoolToken* token) { + DCHECK(token); + MonoTime submit_time = MonoTime::Now(); + + MutexLock guard(lock_); + if (PREDICT_FALSE(!pool_status_.ok())) { + return pool_status_; + } + + if (PREDICT_FALSE(!token->MaySubmitNewTasks())) { + return Status::ServiceUnavailable("Thread pool token was shut down"); + } + + // Size limit check. + int64_t capacity_remaining = static_cast<int64_t>(max_threads_) - active_threads_ + + static_cast<int64_t>(max_queue_size_) - total_queued_tasks_; + if (capacity_remaining < 1) { + return Status::ServiceUnavailable( + Substitute("Thread pool is at capacity ($0/$1 tasks running, $2/$3 tasks queued)", + num_threads_ + num_threads_pending_start_, max_threads_, + total_queued_tasks_, max_queue_size_)); + } + + // Should we create another thread? + + // We assume that each current inactive thread will grab one item from the + // queue. If it seems like we'll need another thread, we create one. + // + // Rather than creating the thread here, while holding the lock, we defer + // it to down below. This is because thread creation can be rather slow + // (hundreds of milliseconds in some cases) and we'd like to allow the + // existing threads to continue to process tasks while we do so. + // + // In theory, a currently active thread could finish immediately after this + // calculation but before our new worker starts running. This would mean we + // created a thread we didn't really need. However, this race is unavoidable + // and harmless. + // + // Of course, we never create more than max_threads_ threads no matter what. + int threads_from_this_submit = + token->IsActive() && token->mode() == ExecutionMode::SERIAL ? 0 : 1; + int inactive_threads = num_threads_ + num_threads_pending_start_ - active_threads_; + int additional_threads = static_cast<int>(queue_.size()) + + threads_from_this_submit + - inactive_threads; + bool need_a_thread = false; + if (additional_threads > 0 && num_threads_ + num_threads_pending_start_ < max_threads_) { + need_a_thread = true; + num_threads_pending_start_++; + } + + Task task; + task.runnable = std::move(r); + task.trace = Trace::CurrentTrace(); + // Need to AddRef, since the thread which submitted the task may go away, + // and we don't want the trace to be destructed while waiting in the queue. + if (task.trace) { + task.trace->AddRef(); + } + task.submit_time = submit_time; + + // Add the task to the token's queue. + ThreadPoolToken::State state = token->state(); + DCHECK(state == ThreadPoolToken::State::IDLE || + state == ThreadPoolToken::State::RUNNING); + token->entries_.emplace_back(std::move(task)); + if (state == ThreadPoolToken::State::IDLE || + token->mode() == ExecutionMode::CONCURRENT) { + queue_.emplace_back(token); + if (state == ThreadPoolToken::State::IDLE) { + token->Transition(ThreadPoolToken::State::RUNNING); + } + } + int length_at_submit = total_queued_tasks_++; + + // Wake up an idle thread for this task. Choosing the thread at the front of + // the list ensures LIFO semantics as idling threads are also added to the front. + // + // If there are no idle threads, the new task remains on the queue and is + // processed by an active thread (or a thread we're about to create) at some + // point in the future. + if (!idle_threads_.empty()) { + idle_threads_.front().not_empty.Signal(); + idle_threads_.pop_front(); + } + guard.Unlock(); + + if (metrics_.queue_length_histogram) { + metrics_.queue_length_histogram->Increment(length_at_submit); + } + if (token->metrics_.queue_length_histogram) { + token->metrics_.queue_length_histogram->Increment(length_at_submit); + } + + if (need_a_thread) { + Status status = CreateThread(); + if (!status.ok()) { + guard.Lock(); + num_threads_pending_start_--; + if (num_threads_ + num_threads_pending_start_ == 0) { + // If we have no threads, we can't do any work. + return status; + } + // If we failed to create a thread, but there are still some other + // worker threads, log a warning message and continue. + LOG(ERROR) << "Thread pool failed to create thread: " + << status.ToString(); + } + } + + + return Status::OK(); +} + +void ThreadPool::Wait() { + MutexLock unique_lock(lock_); + CheckNotPoolThreadUnlocked(); + while (total_queued_tasks_ > 0 || active_threads_ > 0) { + idle_cond_.Wait(); + } +} + +bool ThreadPool::WaitUntil(const MonoTime& until) { + MutexLock unique_lock(lock_); + CheckNotPoolThreadUnlocked(); + while (total_queued_tasks_ > 0 || active_threads_ > 0) { + if (!idle_cond_.WaitUntil(until)) { + return false; + } + } + return true; +} + +bool ThreadPool::WaitFor(const MonoDelta& delta) { + return WaitUntil(MonoTime::Now() + delta); +} + +void ThreadPool::DispatchThread() { + MutexLock unique_lock(lock_); + InsertOrDie(&threads_, Thread::current_thread()); + DCHECK_GT(num_threads_pending_start_, 0); + num_threads_++; + num_threads_pending_start_--; + // If we are one of the first 'min_threads_' to start, we must be + // a "permanent" thread. + bool permanent = num_threads_ <= min_threads_; + + // Owned by this worker thread and added/removed from idle_threads_ as needed. + IdleThread me(&lock_); + + while (true) { + // Note: Status::Aborted() is used to indicate normal shutdown. + if (!pool_status_.ok()) { + VLOG(2) << "DispatchThread exiting: " << pool_status_.ToString(); + break; + } + + if (queue_.empty()) { + // There's no work to do, let's go idle. + // + // Note: if FIFO behavior is desired, it's as simple as changing this to push_back(). + idle_threads_.push_front(me); + SCOPED_CLEANUP({ + // For some wake ups (i.e. Shutdown or DoSubmit) this thread is + // guaranteed to be unlinked after being awakened. In others (i.e. + // spurious wake-up or Wait timeout), it'll still be linked. + if (me.is_linked()) { + idle_threads_.erase(idle_threads_.iterator_to(me)); + } + }); + if (permanent) { + me.not_empty.Wait(); + } else { + if (!me.not_empty.WaitFor(idle_timeout_)) { + // After much investigation, it appears that pthread condition variables have + // a weird behavior in which they can return ETIMEDOUT from timed_wait even if + // another thread did in fact signal. Apparently after a timeout there is some + // brief period during which another thread may actually grab the internal mutex + // protecting the state, signal, and release again before we get the mutex. So, + // we'll recheck the empty queue case regardless. + if (queue_.empty()) { + VLOG(3) << "Releasing worker thread from pool " << name_ << " after " + << idle_timeout_.ToMilliseconds() << "ms of idle time."; + break; + } + } + } + continue; + } + + // Get the next token and task to execute. + ThreadPoolToken* token = queue_.front(); + queue_.pop_front(); + DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state()); + DCHECK(!token->entries_.empty()); + Task task = std::move(token->entries_.front()); + token->entries_.pop_front(); + token->active_threads_++; + --total_queued_tasks_; + ++active_threads_; + + unique_lock.Unlock(); + + // Release the reference which was held by the queued item. + ADOPT_TRACE(task.trace); + if (task.trace) { + task.trace->Release(); + } + + // Update metrics + MonoTime now(MonoTime::Now()); + int64_t queue_time_us = (now - task.submit_time).ToMicroseconds(); + TRACE_COUNTER_INCREMENT(queue_time_trace_metric_name_, queue_time_us); + if (metrics_.queue_time_us_histogram) { + metrics_.queue_time_us_histogram->Increment(queue_time_us); + } + if (token->metrics_.queue_time_us_histogram) { + token->metrics_.queue_time_us_histogram->Increment(queue_time_us); + } + + // Execute the task + { + MicrosecondsInt64 start_wall_us = GetMonoTimeMicros(); + MicrosecondsInt64 start_cpu_us = GetThreadCpuTimeMicros(); + + task.runnable->Run(); + + int64_t wall_us = GetMonoTimeMicros() - start_wall_us; + int64_t cpu_us = GetThreadCpuTimeMicros() - start_cpu_us; + + if (metrics_.run_time_us_histogram) { + metrics_.run_time_us_histogram->Increment(wall_us); + } + if (token->metrics_.run_time_us_histogram) { + token->metrics_.run_time_us_histogram->Increment(wall_us); + } + TRACE_COUNTER_INCREMENT(run_wall_time_trace_metric_name_, wall_us); + TRACE_COUNTER_INCREMENT(run_cpu_time_trace_metric_name_, cpu_us); + } + // Destruct the task while we do not hold the lock. + // + // The task's destructor may be expensive if it has a lot of bound + // objects, and we don't want to block submission of the threadpool. + // In the worst case, the destructor might even try to do something + // with this threadpool, and produce a deadlock. + task.runnable.reset(); + unique_lock.Lock(); + + // Possible states: + // 1. The token was shut down while we ran its task. Transition to QUIESCED. + // 2. The token has no more queued tasks. Transition back to IDLE. + // 3. The token has more tasks. Requeue it and transition back to RUNNABLE. + ThreadPoolToken::State state = token->state(); + DCHECK(state == ThreadPoolToken::State::RUNNING || + state == ThreadPoolToken::State::QUIESCING); + if (--token->active_threads_ == 0) { + if (state == ThreadPoolToken::State::QUIESCING) { + DCHECK(token->entries_.empty()); + token->Transition(ThreadPoolToken::State::QUIESCED); + } else if (token->entries_.empty()) { + token->Transition(ThreadPoolToken::State::IDLE); + } else if (token->mode() == ExecutionMode::SERIAL) { + queue_.emplace_back(token); + } + } + if (--active_threads_ == 0) { + idle_cond_.Broadcast(); + } + } + + // It's important that we hold the lock between exiting the loop and dropping + // num_threads_. Otherwise it's possible someone else could come along here + // and add a new task just as the last running thread is about to exit. + CHECK(unique_lock.OwnsLock()); + + CHECK_EQ(threads_.erase(Thread::current_thread()), 1); + num_threads_--; + if (num_threads_ + num_threads_pending_start_ == 0) { + no_threads_cond_.Broadcast(); + + // Sanity check: if we're the last thread exiting, the queue ought to be + // empty. Otherwise it will never get processed. + CHECK(queue_.empty()); + DCHECK_EQ(0, total_queued_tasks_); + } +} + +Status ThreadPool::CreateThread() { + return kudu::Thread::Create("thread pool", strings::Substitute("$0 [worker]", name_), + &ThreadPool::DispatchThread, this, nullptr); +} + +void ThreadPool::CheckNotPoolThreadUnlocked() { + Thread* current = Thread::current_thread(); + if (ContainsKey(threads_, current)) { + LOG(FATAL) << Substitute("Thread belonging to thread pool '$0' with " + "name '$1' called pool function that would result in deadlock", + name_, current->name()); + } +} + +std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) { + return o << ThreadPoolToken::StateToString(s); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/threadpool.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/threadpool.h b/be/src/kudu/util/threadpool.h new file mode 100644 index 0000000..1557486 --- /dev/null +++ b/be/src/kudu/util/threadpool.h @@ -0,0 +1,505 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_THREAD_POOL_H +#define KUDU_UTIL_THREAD_POOL_H + +#include <deque> +#include <iosfwd> +#include <memory> +#include <string> +#include <unordered_set> + +#include <boost/intrusive/list.hpp> +#include <boost/intrusive/list_hook.hpp> +#include <gtest/gtest_prod.h> + +#include "kudu/gutil/callback.h" +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/macros.h" +#include "kudu/gutil/port.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/condition_variable.h" +#include "kudu/util/metrics.h" +#include "kudu/util/monotime.h" +#include "kudu/util/mutex.h" +#include "kudu/util/status.h" + +namespace boost { +template <typename Signature> +class function; +} // namespace boost + +namespace kudu { + +class Thread; +class ThreadPool; +class ThreadPoolToken; +class Trace; + +class Runnable { + public: + virtual void Run() = 0; + virtual ~Runnable() {} +}; + +// Interesting thread pool metrics. Can be applied to the entire pool (see +// ThreadPoolBuilder) or to individual tokens. +struct ThreadPoolMetrics { + // Measures the queue length seen by tasks when they enter the queue. + scoped_refptr<Histogram> queue_length_histogram; + + // Measures the amount of time that tasks spend waiting in a queue. + scoped_refptr<Histogram> queue_time_us_histogram; + + // Measures the amount of time that tasks spend running. + scoped_refptr<Histogram> run_time_us_histogram; +}; + +// ThreadPool takes a lot of arguments. We provide sane defaults with a builder. +// +// name: Used for debugging output and default names of the worker threads. +// Since thread names are limited to 16 characters on Linux, it's good to +// choose a short name here. +// Required. +// +// trace_metric_prefix: used to prefix the names of TraceMetric counters. +// When a task on a thread pool has an associated trace, the thread pool +// implementation will increment TraceMetric counters to indicate the +// amount of time spent waiting in the queue as well as the amount of wall +// and CPU time spent executing. By default, these counters are prefixed +// with the name of the thread pool. For example, if the pool is named +// 'apply', then counters such as 'apply.queue_time_us' will be +// incremented. +// +// The TraceMetrics implementation relies on the number of distinct counter +// names being small. Thus, if the thread pool name itself is dynamically +// generated, the default behavior described above would result in an +// unbounded number of distinct counter names. The 'trace_metric_prefix' +// setting can be used to override the prefix used in generating the trace +// metric names. +// +// For example, the Raft thread pools are named "<tablet id>-raft" which +// has unbounded cardinality (a server may have thousands of different +// tablet IDs over its lifetime). In that case, setting the prefix to +// "raft" will avoid any issues. +// +// min_threads: Minimum number of threads we'll have at any time. +// Default: 0. +// +// max_threads: Maximum number of threads we'll have at any time. +// Default: Number of CPUs detected on the system. +// +// max_queue_size: Maximum number of items to enqueue before returning a +// Status::ServiceUnavailable message from Submit(). +// Default: INT_MAX. +// +// idle_timeout: How long we'll keep around an idle thread before timing it out. +// We always keep at least min_threads. +// Default: 500 milliseconds. +// +// metrics: Histograms, counters, etc. to update on various threadpool events. +// Default: not set. +// +class ThreadPoolBuilder { + public: + explicit ThreadPoolBuilder(std::string name); + + // Note: We violate the style guide by returning mutable references here + // in order to provide traditional Builder pattern conveniences. + ThreadPoolBuilder& set_trace_metric_prefix(const std::string& prefix); + ThreadPoolBuilder& set_min_threads(int min_threads); + ThreadPoolBuilder& set_max_threads(int max_threads); + ThreadPoolBuilder& set_max_queue_size(int max_queue_size); + ThreadPoolBuilder& set_idle_timeout(const MonoDelta& idle_timeout); + ThreadPoolBuilder& set_metrics(ThreadPoolMetrics metrics); + + // Instantiate a new ThreadPool with the existing builder arguments. + Status Build(gscoped_ptr<ThreadPool>* pool) const; + + private: + friend class ThreadPool; + const std::string name_; + std::string trace_metric_prefix_; + int min_threads_; + int max_threads_; + int max_queue_size_; + MonoDelta idle_timeout_; + ThreadPoolMetrics metrics_; + + DISALLOW_COPY_AND_ASSIGN(ThreadPoolBuilder); +}; + +// Thread pool with a variable number of threads. +// +// Tasks submitted directly to the thread pool enter a FIFO queue and are +// dispatched to a worker thread when one becomes free. Tasks may also be +// submitted via ThreadPoolTokens. The token Wait() and Shutdown() functions +// can then be used to block on logical groups of tasks. +// +// A token operates in one of two ExecutionModes, determined at token +// construction time: +// 1. SERIAL: submitted tasks are run one at a time. +// 2. CONCURRENT: submitted tasks may be run in parallel. This isn't unlike +// tasks submitted without a token, but the logical grouping that tokens +// impart can be useful when a pool is shared by many contexts (e.g. to +// safely shut down one context, to derive context-specific metrics, etc.). +// +// Tasks submitted without a token or via ExecutionMode::CONCURRENT tokens are +// processed in FIFO order. On the other hand, ExecutionMode::SERIAL tokens are +// processed in a round-robin fashion, one task at a time. This prevents them +// from starving one another. However, tokenless (and CONCURRENT token-based) +// tasks can starve SERIAL token-based tasks. +// +// Usage Example: +// static void Func(int n) { ... } +// class Task : public Runnable { ... } +// +// gscoped_ptr<ThreadPool> thread_pool; +// CHECK_OK( +// ThreadPoolBuilder("my_pool") +// .set_min_threads(0) +// .set_max_threads(5) +// .set_max_queue_size(10) +// .set_idle_timeout(MonoDelta::FromMilliseconds(2000)) +// .Build(&thread_pool)); +// thread_pool->Submit(shared_ptr<Runnable>(new Task())); +// thread_pool->SubmitFunc(boost::bind(&Func, 10)); +class ThreadPool { + public: + ~ThreadPool(); + + // Wait for the running tasks to complete and then shutdown the threads. + // All the other pending tasks in the queue will be removed. + // NOTE: That the user may implement an external abort logic for the + // runnables, that must be called before Shutdown(), if the system + // should know about the non-execution of these tasks, or the runnable + // require an explicit "abort" notification to exit from the run loop. + void Shutdown(); + + // Submits a function using the kudu Closure system. + Status SubmitClosure(Closure c) WARN_UNUSED_RESULT; + + // Submits a function bound using boost::bind(&FuncName, args...). + Status SubmitFunc(boost::function<void()> f) WARN_UNUSED_RESULT; + + // Submits a Runnable class. + Status Submit(std::shared_ptr<Runnable> r) WARN_UNUSED_RESULT; + + // Waits until all the tasks are completed. + void Wait(); + + // Waits for the pool to reach the idle state, or until 'until' time is reached. + // Returns true if the pool reached the idle state, false otherwise. + bool WaitUntil(const MonoTime& until); + + // Waits for the pool to reach the idle state, or until 'delta' time elapses. + // Returns true if the pool reached the idle state, false otherwise. + bool WaitFor(const MonoDelta& delta); + + // Allocates a new token for use in token-based task submission. All tokens + // must be destroyed before their ThreadPool is destroyed. + // + // There is no limit on the number of tokens that may be allocated. + enum class ExecutionMode { + // Tasks submitted via this token will be executed serially. + SERIAL, + + // Tasks submitted via this token may be executed concurrently. + CONCURRENT, + }; + std::unique_ptr<ThreadPoolToken> NewToken(ExecutionMode mode); + + // Like NewToken(), but lets the caller provide metrics for the token. These + // metrics are incremented/decremented in addition to the configured + // pool-wide metrics (if any). + std::unique_ptr<ThreadPoolToken> NewTokenWithMetrics(ExecutionMode mode, + ThreadPoolMetrics metrics); + + // Return the number of threads currently running (or in the process of starting up) + // for this thread pool. + int num_threads() const { + MutexLock l(lock_); + return num_threads_ + num_threads_pending_start_; + } + + private: + FRIEND_TEST(ThreadPoolTest, TestThreadPoolWithNoMinimum); + FRIEND_TEST(ThreadPoolTest, TestVariableSizeThreadPool); + + friend class ThreadPoolBuilder; + friend class ThreadPoolToken; + + // Client-provided task to be executed by this pool. + struct Task { + std::shared_ptr<Runnable> runnable; + Trace* trace; + + // Time at which the entry was submitted to the pool. + MonoTime submit_time; + }; + + // Creates a new thread pool using a builder. + explicit ThreadPool(const ThreadPoolBuilder& builder); + + // Initializes the thread pool by starting the minimum number of threads. + Status Init(); + + // Dispatcher responsible for dequeueing and executing the tasks + void DispatchThread(); + + // Create new thread. + // + // REQUIRES: caller has incremented 'num_threads_pending_start_' ahead of this call. + // NOTE: For performance reasons, lock_ should not be held. + Status CreateThread(); + + // Aborts if the current thread is a member of this thread pool. + void CheckNotPoolThreadUnlocked(); + + // Submits a task to be run via token. + Status DoSubmit(std::shared_ptr<Runnable> r, ThreadPoolToken* token); + + // Releases token 't' and invalidates it. + void ReleaseToken(ThreadPoolToken* t); + + const std::string name_; + const int min_threads_; + const int max_threads_; + const int max_queue_size_; + const MonoDelta idle_timeout_; + + // Overall status of the pool. Set to an error when the pool is shut down. + // + // Protected by 'lock_'. + Status pool_status_; + + // Synchronizes many of the members of the pool and all of its + // condition variables. + mutable Mutex lock_; + + // Condition variable for "pool is idling". Waiters wake up when + // active_threads_ reaches zero. + ConditionVariable idle_cond_; + + // Condition variable for "pool has no threads". Waiters wake up when + // num_threads_ and num_pending_threads_ are both 0. + ConditionVariable no_threads_cond_; + + // Number of threads currently running. + // + // Protected by lock_. + int num_threads_; + + // Number of threads which are in the process of starting. + // When these threads start, they will decrement this counter and + // accordingly increment 'num_threads_'. + // + // Protected by lock_. + int num_threads_pending_start_; + + // Number of threads currently running and executing client tasks. + // + // Protected by lock_. + int active_threads_; + + // Total number of client tasks queued, either directly (queue_) or + // indirectly (tokens_). + // + // Protected by lock_. + int total_queued_tasks_; + + // All allocated tokens. + // + // Protected by lock_. + std::unordered_set<ThreadPoolToken*> tokens_; + + // FIFO of tokens from which tasks should be executed. Does not own the + // tokens; they are owned by clients and are removed from the FIFO on shutdown. + // + // Protected by lock_. + std::deque<ThreadPoolToken*> queue_; + + // Pointers to all running threads. Raw pointers are safe because a Thread + // may only go out of scope after being removed from threads_. + // + // Protected by lock_. + std::unordered_set<Thread*> threads_; + + // List of all threads currently waiting for work. + // + // A thread is added to the front of the list when it goes idle and is + // removed from the front and signaled when new work arrives. This produces a + // LIFO usage pattern that is more efficient than idling on a single + // ConditionVariable (which yields FIFO semantics). + // + // Protected by lock_. + struct IdleThread : public boost::intrusive::list_base_hook<> { + explicit IdleThread(Mutex* m) + : not_empty(m) {} + + // Condition variable for "queue is not empty". Waiters wake up when a new + // task is queued. + ConditionVariable not_empty; + + DISALLOW_COPY_AND_ASSIGN(IdleThread); + }; + boost::intrusive::list<IdleThread> idle_threads_; // NOLINT(build/include_what_you_use) + + // ExecutionMode::CONCURRENT token used by the pool for tokenless submission. + std::unique_ptr<ThreadPoolToken> tokenless_; + + // Metrics for the entire thread pool. + const ThreadPoolMetrics metrics_; + + const char* queue_time_trace_metric_name_; + const char* run_wall_time_trace_metric_name_; + const char* run_cpu_time_trace_metric_name_; + + DISALLOW_COPY_AND_ASSIGN(ThreadPool); +}; + +// Entry point for token-based task submission and blocking for a particular +// thread pool. Tokens can only be created via ThreadPool::NewToken(). +// +// All functions are thread-safe. Mutable members are protected via the +// ThreadPool's lock. +class ThreadPoolToken { + public: + // Destroys the token. + // + // May be called on a token with outstanding tasks, as Shutdown() will be + // called first to take care of them. + ~ThreadPoolToken(); + + // Submits a function using the kudu Closure system. + Status SubmitClosure(Closure c) WARN_UNUSED_RESULT; + + // Submits a function bound using boost::bind(&FuncName, args...). + Status SubmitFunc(boost::function<void()> f) WARN_UNUSED_RESULT; + + // Submits a Runnable class. + Status Submit(std::shared_ptr<Runnable> r) WARN_UNUSED_RESULT; + + // Marks the token as unusable for future submissions. Any queued tasks not + // yet running are destroyed. If tasks are in flight, Shutdown() will wait + // on their completion before returning. + void Shutdown(); + + // Waits until all the tasks submitted via this token are completed. + void Wait(); + + // Waits for all submissions using this token are complete, or until 'until' + // time is reached. + // + // Returns true if all submissions are complete, false otherwise. + bool WaitUntil(const MonoTime& until); + + // Waits for all submissions using this token are complete, or until 'delta' + // time elapses. + // + // Returns true if all submissions are complete, false otherwise. + bool WaitFor(const MonoDelta& delta); + + private: + // All possible token states. Legal state transitions: + // IDLE -> RUNNING: task is submitted via token + // IDLE -> QUIESCED: token or pool is shut down + // RUNNING -> IDLE: worker thread finishes executing a task and + // there are no more tasks queued to the token + // RUNNING -> QUIESCING: token or pool is shut down while worker thread + // is executing a task + // RUNNING -> QUIESCED: token or pool is shut down + // QUIESCING -> QUIESCED: worker thread finishes executing a task + // belonging to a shut down token or pool + enum class State { + // Token has no queued tasks. + IDLE, + + // A worker thread is running one of the token's previously queued tasks. + RUNNING, + + // No new tasks may be submitted to the token. A worker thread is still + // running a previously queued task. + QUIESCING, + + // No new tasks may be submitted to the token. There are no active tasks + // either. At this state, the token may only be destroyed. + QUIESCED, + }; + + // Writes a textual representation of the token state in 's' to 'o'. + friend std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s); + + friend class ThreadPool; + + // Returns a textual representation of 's' suitable for debugging. + static const char* StateToString(State s); + + // Constructs a new token. + // + // The token may not outlive its thread pool ('pool'). + ThreadPoolToken(ThreadPool* pool, + ThreadPool::ExecutionMode mode, + ThreadPoolMetrics metrics); + + // Changes this token's state to 'new_state' taking actions as needed. + void Transition(State new_state); + + // Returns true if this token has a task queued and ready to run, or if a + // task belonging to this token is already running. + bool IsActive() const { + return state_ == State::RUNNING || + state_ == State::QUIESCING; + } + + // Returns true if new tasks may be submitted to this token. + bool MaySubmitNewTasks() const { + return state_ != State::QUIESCING && + state_ != State::QUIESCED; + } + + State state() const { return state_; } + ThreadPool::ExecutionMode mode() const { return mode_; } + + // Token's configured execution mode. + const ThreadPool::ExecutionMode mode_; + + // Metrics for just this token. + const ThreadPoolMetrics metrics_; + + // Pointer to the token's thread pool. + ThreadPool* pool_; + + // Token state machine. + State state_; + + // Queued client tasks. + std::deque<ThreadPool::Task> entries_; + + // Condition variable for "token is idle". Waiters wake up when the token + // transitions to IDLE or QUIESCED. + ConditionVariable not_running_cond_; + + // Number of worker threads currently executing tasks belonging to this + // token. + int active_threads_; + + DISALLOW_COPY_AND_ASSIGN(ThreadPoolToken); +}; + +} // namespace kudu +#endif http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/throttler-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/throttler-test.cc b/be/src/kudu/util/throttler-test.cc new file mode 100644 index 0000000..ff97eb5 --- /dev/null +++ b/be/src/kudu/util/throttler-test.cc @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/throttler.h" + +#include <gtest/gtest.h> + +#include "kudu/util/monotime.h" +#include "kudu/util/test_util.h" + +namespace kudu { + +class ThrottlerTest : public KuduTest { +}; + +TEST_F(ThrottlerTest, TestOpThrottle) { + // Check operation rate throttling + MonoTime now = MonoTime::Now(); + Throttler t0(now, 1000, 1000*1000, 1); + // Fill up bucket + now += MonoDelta::FromMilliseconds(2000); + // Check throttle behavior for 1 second. + for (int p = 0; p < 10; p++) { + for (int i = 0; i < 100; i++) { + ASSERT_TRUE(t0.Take(now, 1, 1)); + } + ASSERT_FALSE(t0.Take(now, 1, 1)); + now += MonoDelta::FromMilliseconds(100); + } +} + +TEST_F(ThrottlerTest, TestIOThrottle) { + // Check operation rate throttling + MonoTime now = MonoTime::Now(); + Throttler t0(now, 50000, 1000*1000, 1); + // Fill up bucket + now += MonoDelta::FromMilliseconds(2000); + // Check throttle behavior for 1 second. + for (int p = 0; p < 10; p++) { + for (int i = 0; i < 100; i++) { + ASSERT_TRUE(t0.Take(now, 1, 1000)); + } + ASSERT_FALSE(t0.Take(now, 1, 1000)); + now += MonoDelta::FromMilliseconds(100); + } +} + +TEST_F(ThrottlerTest, TestBurst) { + // Check IO rate throttling + MonoTime now = MonoTime::Now(); + Throttler t0(now, 2000, 1000*1000, 5); + // Fill up bucket + now += MonoDelta::FromMilliseconds(2000); + for (int i = 0; i < 100; i++) { + now += MonoDelta::FromMilliseconds(1); + ASSERT_TRUE(t0.Take(now, 1, 5000)); + } + ASSERT_TRUE(t0.Take(now, 1, 100000)); + ASSERT_FALSE(t0.Take(now, 1, 1)); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/throttler.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/throttler.cc b/be/src/kudu/util/throttler.cc new file mode 100644 index 0000000..69e0f99 --- /dev/null +++ b/be/src/kudu/util/throttler.cc @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/throttler.h" + +#include <algorithm> +#include <mutex> + +namespace kudu { + +Throttler::Throttler(MonoTime now, uint64_t op_rate, uint64_t byte_rate, double burst_factor) : + next_refill_(now) { + op_refill_ = op_rate / (MonoTime::kMicrosecondsPerSecond / kRefillPeriodMicros); + op_token_ = 0; + op_token_max_ = static_cast<uint64_t>(op_refill_ * burst_factor); + byte_refill_ = byte_rate / (MonoTime::kMicrosecondsPerSecond / kRefillPeriodMicros); + byte_token_ = 0; + byte_token_max_ = static_cast<uint64_t>(byte_refill_ * burst_factor); +} + +bool Throttler::Take(MonoTime now, uint64_t op, uint64_t byte) { + if (op_refill_ == 0 && byte_refill_ == 0) { + return true; + } + std::lock_guard<simple_spinlock> lock(lock_); + Refill(now); + if ((op_refill_ == 0 || op <= op_token_) && + (byte_refill_ == 0 || byte <= byte_token_)) { + if (op_refill_ > 0) { + op_token_ -= op; + } + if (byte_refill_ > 0) { + byte_token_ -= byte; + } + return true; + } + return false; +} + +void Throttler::Refill(MonoTime now) { + int64_t d = (now - next_refill_).ToMicroseconds(); + if (d < 0) { + return; + } + uint64_t num_period = d / kRefillPeriodMicros + 1; + next_refill_ += MonoDelta::FromMicroseconds(num_period * kRefillPeriodMicros); + op_token_ += num_period * op_refill_; + op_token_ = std::min(op_token_, op_token_max_); + byte_token_ += num_period * byte_refill_; + byte_token_ = std::min(byte_token_, byte_token_max_); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/throttler.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/throttler.h b/be/src/kudu/util/throttler.h new file mode 100644 index 0000000..5594091 --- /dev/null +++ b/be/src/kudu/util/throttler.h @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_THROTTLER_H +#define KUDU_UTIL_THROTTLER_H + +#include <cstdint> + +#include "kudu/util/locks.h" +#include "kudu/util/monotime.h" + +namespace kudu { + +// A throttler to throttle both operation/s and IO byte/s. +class Throttler { + public: + // Refill period is 100ms. + enum { + kRefillPeriodMicros = 100000 + }; + + // Construct a throttler with max operation per second, max IO bytes per second + // and burst factor (burst_rate = rate * burst), burst rate means maximum + // throughput within one refill period. + // Set op_per_sec to 0 to disable operation throttling. + // Set byte_per_sec to 0 to disable IO bytes throttling. + Throttler(MonoTime now, uint64_t op_per_sec, uint64_t byte_per_sec, double burst_factor); + + // Throttle an "operation group" by taking 'op' operation tokens and 'byte' byte tokens. + // Return true if there are enough tokens, and operation is allowed. + // Return false if there are not enough tokens, and operation is throttled. + bool Take(MonoTime now, uint64_t op, uint64_t byte); + + private: + void Refill(MonoTime now); + + MonoTime next_refill_; + uint64_t op_refill_; + uint64_t op_token_; + uint64_t op_token_max_; + uint64_t byte_refill_; + uint64_t byte_token_; + uint64_t byte_token_max_; + simple_spinlock lock_; +}; + +} // namespace kudu + +#endif