This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new b60b724ce [threadpool] KUDU-3364 Add TimerThread for thread pool
b60b724ce is described below
commit b60b724ce033a8956508701a7d58b94a60184313
Author: shenxingwuying <[email protected]>
AuthorDate: Mon Apr 25 15:15:52 2022 +0800
[threadpool] KUDU-3364 Add TimerThread for thread pool
Add TimerThread to ThreadPool to support
executing tasks asynchronously and with a delay
I plan using the feature to add tools to trigger data rebalance,
leader rebalance, some kinds of compaction ops. Of course, some
single threads can refactor into one threadpool.
More details at KUDU-3364.
Change-Id: If51fce48ae6a45a0bf3314f8a102ee373c5c442e
Reviewed-on: http://gerrit.cloudera.org:8080/18447
Tested-by: Kudu Jenkins
Reviewed-by: Yingchun Lai <[email protected]>
---
src/kudu/util/threadpool-test.cc | 79 ++++++++++++++++++++++++++++++
src/kudu/util/threadpool.cc | 102 +++++++++++++++++++++++++++++++++++++--
src/kudu/util/threadpool.h | 81 +++++++++++++++++++++++++++++++
3 files changed, 257 insertions(+), 5 deletions(-)
diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index 1505a497c..31e57723c 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -90,6 +90,15 @@ class ThreadPoolTest : public KuduTest {
.Build(&pool_);
}
+ Status RebuildPoolWithScheduler(int min_threads, int max_threads) {
+ return ThreadPoolBuilder(kDefaultPoolName)
+ .set_min_threads(min_threads)
+ .set_max_threads(max_threads)
+ .set_enable_scheduler()
+ .set_schedule_period_ms(100)
+ .Build(&pool_);
+ }
+
protected:
unique_ptr<ThreadPool> pool_;
};
@@ -97,6 +106,8 @@ class ThreadPoolTest : public KuduTest {
TEST_F(ThreadPoolTest, TestNoTaskOpenClose) {
ASSERT_OK(RebuildPoolWithMinMax(4, 4));
pool_->Shutdown();
+ ASSERT_OK(RebuildPoolWithScheduler(4, 4));
+ pool_->Shutdown();
}
static void SimpleTaskMethod(int n, Atomic32* counter) {
@@ -122,6 +133,8 @@ class SimpleTask {
};
TEST_F(ThreadPoolTest, TestSimpleTasks) {
+ constexpr int kDelayMs = 1000;
+
ASSERT_OK(RebuildPoolWithMinMax(4, 4));
Atomic32 counter(0);
@@ -134,6 +147,20 @@ TEST_F(ThreadPoolTest, TestSimpleTasks) {
ASSERT_OK(pool_->Submit([&counter]() { SimpleTaskMethod(123, &counter); }));
pool_->Wait();
ASSERT_EQ(10 + 15 + 20 + 15 + 123, base::subtle::NoBarrier_Load(&counter));
+
+ ASSERT_OK(RebuildPoolWithScheduler(4, 4));
+ unique_ptr<ThreadPoolToken> token =
pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
+ MonoTime start = MonoTime::Now();
+ ASSERT_OK(token->Schedule([&counter]() {
+ SimpleTaskMethod(13, &counter);
+ }, kDelayMs));
+
+ pool_->WaitForScheduler();
+ ASSERT_EQ(10 + 15 + 20 + 15 + 123 + 13,
base::subtle::NoBarrier_Load(&counter));
+ MonoDelta delta = MonoTime::Now() - start;
+ MonoDelta expect_upper_limit =
MonoDelta::FromMilliseconds(static_cast<int>(kDelayMs * 1.2));
+ MonoDelta expect_lower_limit =
MonoDelta::FromMilliseconds(static_cast<int>(kDelayMs * 0.9));
+ ASSERT_TRUE(delta.MoreThan(expect_lower_limit) &&
delta.LessThan(expect_upper_limit));
}
static void IssueTraceStatement() {
@@ -160,6 +187,11 @@ TEST_F(ThreadPoolTest, TestSubmitAfterShutdown) {
Status s = pool_->Submit(&IssueTraceStatement);
ASSERT_EQ("Service unavailable: The pool has been shut down.",
s.ToString());
+
+ ASSERT_OK(RebuildPoolWithScheduler(1, 1));
+ unique_ptr<ThreadPoolToken> token =
pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
+ pool_->Shutdown();
+ ASSERT_TRUE(token->Schedule(&IssueTraceStatement,
1000).IsServiceUnavailable());
}
TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
@@ -198,6 +230,53 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
ASSERT_EQ(0, pool_->num_threads());
}
+TEST_F(ThreadPoolTest, TestThreadPoolWithSchedulerAndNoMinimum) {
+ constexpr int kIdleTimeoutMs = 1;
+ constexpr int kDelayMs = 1000;
+ ASSERT_OK(RebuildPoolWithBuilder(
+ ThreadPoolBuilder(kDefaultPoolName)
+ .set_min_threads(0)
+ .set_max_threads(4)
+ .set_enable_scheduler()
+ .set_idle_timeout(MonoDelta::FromMilliseconds(kIdleTimeoutMs))));
+ // There are no threads to start with.
+ ASSERT_EQ(0, pool_->active_threads_);
+ ASSERT_EQ(0, pool_->num_threads());
+
+ CountDownLatch latch(1);
+ SCOPED_CLEANUP({
+ latch.CountDown();
+ });
+ ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
+ ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
+ ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
+ ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
+ ASSERT_EQ(4, pool_->num_threads());
+ unique_ptr<ThreadPoolToken> token =
pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
+ ASSERT_OK(token->Submit([&latch]() { latch.Wait(); }));
+ ASSERT_EQ(4, pool_->num_threads());
+
+ ASSERT_OK(token->Schedule([&latch]() { latch.Wait(); }, kDelayMs));
+ ASSERT_OK(token->Schedule([&latch]() { latch.Wait(); },
static_cast<int>(kDelayMs * 1.2)));
+ ASSERT_EQ(4, pool_->num_threads());
+ latch.CountDown();
+ pool_->Wait();
+
+ latch.Reset(1);
+ ASSERT_EQ(0, pool_->num_active_threads());
+ SleepFor(MonoDelta::FromMilliseconds(static_cast<int>(kDelayMs * 1.5)));
+ ASSERT_GT(pool_->num_active_threads(), 0);
+ ASSERT_OK(token->Schedule([&latch]() { latch.Wait(); }, kDelayMs));
+ latch.CountDown();
+ pool_->Wait();
+ ASSERT_EQ(0, pool_->num_active_threads());
+ SleepFor(MonoDelta::FromMilliseconds(10 * kIdleTimeoutMs));
+ ASSERT_EQ(0, pool_->num_threads());
+ ASSERT_EQ(0, pool_->num_active_threads());
+ pool_->Shutdown();
+ ASSERT_EQ(nullptr, pool_->scheduler());
+}
+
TEST_F(ThreadPoolTest, TestThreadPoolWithNoMaxThreads) {
// By default a threadpool's max_threads is set to the number of CPUs, so
// this test submits more tasks than that to ensure that the number of CPUs
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index 6297432c7..95c16178c 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -17,14 +17,14 @@
#include "kudu/util/threadpool.h"
-#include <algorithm>
-#include <cstdint>
#include <deque>
#include <functional>
#include <limits>
#include <memory>
#include <ostream>
#include <string>
+#include <utility>
+#include <vector>
#include <glog/logging.h>
@@ -41,9 +41,9 @@
namespace kudu {
using std::deque;
-using std::shared_ptr;
using std::string;
using std::unique_ptr;
+using std::vector;
using strings::Substitute;
////////////////////////////////////////////////////////
@@ -55,7 +55,9 @@ ThreadPoolBuilder::ThreadPoolBuilder(string name)
min_threads_(0),
max_threads_(base::NumCPUs()),
max_queue_size_(std::numeric_limits<int>::max()),
- idle_timeout_(MonoDelta::FromMilliseconds(500)) {}
+ idle_timeout_(MonoDelta::FromMilliseconds(500)),
+ enable_scheduler_(false),
+ schedule_period_ms_(100) {}
ThreadPoolBuilder& ThreadPoolBuilder::set_trace_metric_prefix(const string&
prefix) {
trace_metric_prefix_ = prefix;
@@ -89,6 +91,16 @@ ThreadPoolBuilder&
ThreadPoolBuilder::set_metrics(ThreadPoolMetrics metrics) {
return *this;
}
+ThreadPoolBuilder& ThreadPoolBuilder::set_enable_scheduler() {
+ enable_scheduler_ = true;
+ return *this;
+}
+
+ThreadPoolBuilder& ThreadPoolBuilder::set_schedule_period_ms(uint32_t
schedule_period_ms) {
+ schedule_period_ms_ = schedule_period_ms;
+ return *this;
+}
+
ThreadPoolBuilder& ThreadPoolBuilder::set_queue_overload_threshold(
const MonoDelta& threshold) {
queue_overload_threshold_ = threshold;
@@ -100,6 +112,50 @@ Status ThreadPoolBuilder::Build(unique_ptr<ThreadPool>*
pool) const {
return (*pool)->Init();
}
+SchedulerThread::SchedulerThread(string thread_pool_name, uint32_t
schedule_period_ms)
+ : thread_pool_name_(std::move(thread_pool_name)),
+ schedule_period_ms_(schedule_period_ms),
+ shutdown_(1) {}
+
+SchedulerThread::~SchedulerThread() {
+ if (thread_) {
+ Shutdown();
+ }
+}
+
+Status SchedulerThread::Start() {
+ return Thread::Create(
+ thread_pool_name_, "scheduler", [this]() { this->RunLoop(); }, &thread_);
+}
+
+Status SchedulerThread::Shutdown() {
+ if (thread_) {
+ shutdown_.CountDown();
+ thread_->Join();
+ }
+ return Status::OK();
+}
+
+void SchedulerThread::RunLoop() {
+ while (!shutdown_.WaitFor(MonoDelta::FromMilliseconds(schedule_period_ms_)))
{
+ MonoTime now = MonoTime::Now();
+ vector<SchedulerTask> pending_tasks;
+ {
+ MutexLock auto_lock(mutex_);
+ auto upper_it = tasks_.upper_bound(now);
+ for (auto it = tasks_.begin(); it != upper_it; it++) {
+ pending_tasks.emplace_back(std::move(it->second));
+ }
+ tasks_.erase(tasks_.begin(), upper_it);
+ }
+
+ for (const auto& task : pending_tasks) {
+ ThreadPoolToken* token = task.thread_pool_token_;
+ CHECK_OK(token->Submit(task.f));
+ }
+ }
+}
+
////////////////////////////////////////////////////////
// ThreadPoolToken
////////////////////////////////////////////////////////
@@ -184,6 +240,18 @@ void ThreadPoolToken::Shutdown() {
}
}
+// Submit a task, running after delay_ms delay some time
+Status ThreadPoolToken::Schedule(std::function<void()> f, int64_t delay_ms) {
+ if (PREDICT_FALSE(!MaySubmitNewTasks())) {
+ return Status::ServiceUnavailable("Thread pool token was shut down");
+ }
+ CHECK(mode() == ThreadPool::ExecutionMode::SERIAL);
+ MonoTime excute_time = MonoTime::Now();
+ excute_time.AddDelta(MonoDelta::FromMilliseconds(delay_ms));
+ pool_->scheduler()->Schedule(this, std::move(f), excute_time);
+ return Status::OK();
+}
+
void ThreadPoolToken::Wait() {
MutexLock unique_lock(pool_->lock_);
pool_->CheckNotPoolThreadUnlocked();
@@ -284,7 +352,10 @@ ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
active_threads_(0),
total_queued_tasks_(0),
tokenless_(NewToken(ExecutionMode::CONCURRENT)),
- metrics_(builder.metrics_) {
+ metrics_(builder.metrics_),
+ scheduler_(nullptr),
+ schedule_period_ms_(builder.schedule_period_ms_),
+ enable_scheduler_(builder.enable_scheduler_) {
string prefix = !builder.trace_metric_prefix_.empty() ?
builder.trace_metric_prefix_ : builder.name_;
@@ -322,6 +393,10 @@ Status ThreadPool::Init() {
return status;
}
}
+ if (enable_scheduler_) {
+ scheduler_ = new SchedulerThread(name_, schedule_period_ms_);
+ RETURN_NOT_OK(scheduler_->Start());
+ }
return Status::OK();
}
@@ -329,6 +404,11 @@ void ThreadPool::Shutdown() {
MutexLock unique_lock(lock_);
CheckNotPoolThreadUnlocked();
+ if (scheduler_) {
+ scheduler_->Shutdown();
+ delete scheduler_;
+ scheduler_ = nullptr;
+ }
// Note: this is the same error seen at submission if the pool is at
// capacity, so clients can't tell them apart. This isn't really a practical
// concern though because shutting down a pool typically requires clients to
@@ -559,6 +639,18 @@ void ThreadPool::Wait() {
}
}
+void ThreadPool::WaitForScheduler() {
+ MutexLock unique_lock(lock_);
+ CheckNotPoolThreadUnlocked();
+ // Generally, ignore scheduler's pending tasks, but
+ // set ScheduledTaskWaitType::WAIT at unit tests.
+ bool wait_scheduler = (scheduler_ != nullptr);
+ while (total_queued_tasks_ > 0 || active_threads_ > 0 ||
+ (wait_scheduler && !scheduler_->empty())) {
+ idle_cond_.Wait();
+ }
+}
+
bool ThreadPool::WaitUntil(const MonoTime& until) {
MutexLock unique_lock(lock_);
CheckNotPoolThreadUnlocked();
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index ff1fae83c..8c68a88fa 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -20,12 +20,15 @@
#include <atomic>
#include <cstddef>
+#include <cstdint>
#include <deque>
#include <functional>
#include <iosfwd>
+#include <map>
#include <memory>
#include <string>
#include <unordered_set>
+#include <utility>
#include <boost/intrusive/list.hpp>
#include <boost/intrusive/list_hook.hpp>
@@ -35,6 +38,7 @@
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/condition_variable.h"
+#include "kudu/util/countdown_latch.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/mutex.h"
@@ -119,6 +123,8 @@ class ThreadPoolBuilder {
ThreadPoolBuilder& set_idle_timeout(const MonoDelta& idle_timeout);
ThreadPoolBuilder& set_queue_overload_threshold(const MonoDelta& threshold);
ThreadPoolBuilder& set_metrics(ThreadPoolMetrics metrics);
+ ThreadPoolBuilder& set_enable_scheduler();
+ ThreadPoolBuilder& set_schedule_period_ms(uint32_t schedule_period_ms);
// Instantiate a new ThreadPool with the existing builder arguments.
Status Build(std::unique_ptr<ThreadPool>* pool) const;
@@ -133,10 +139,58 @@ class ThreadPoolBuilder {
MonoDelta idle_timeout_;
MonoDelta queue_overload_threshold_;
ThreadPoolMetrics metrics_;
+ bool enable_scheduler_;
+ uint32_t schedule_period_ms_;
DISALLOW_COPY_AND_ASSIGN(ThreadPoolBuilder);
};
+
+// SchedulerThread for asynchronized delay task execution.
+class SchedulerThread {
+public:
+ explicit SchedulerThread(std::string thread_pool_name, uint32_t
schedule_period_ms);
+
+ ~SchedulerThread();
+
+ // Start a thread to judge which tasks can be execute now.
+ Status Start();
+
+ // Shutdown the thread and clear the pending tasks.
+ Status Shutdown();
+
+ struct SchedulerTask {
+ ThreadPoolToken* thread_pool_token_;
+ std::function<void()> f;
+ };
+
+ void Schedule(ThreadPoolToken* token, std::function<void()> f, const
MonoTime& execute_time) {
+ MutexLock unique_lock(mutex_);
+ tasks_.insert({execute_time, SchedulerTask({token, std::move(f)})});
+ }
+
+ bool empty() const {
+ MutexLock unique_lock(mutex_);
+ return tasks_.empty();
+ }
+
+private:
+ friend class ThreadPool;
+ friend class ThreadPoolToken;
+
+ void RunLoop();
+
+ const std::string thread_pool_name_;
+ // scheduler's period checking time.
+ const uint32_t schedule_period_ms_;
+ CountDownLatch shutdown_;
+ // Protect `tasks_` data race.
+ mutable Mutex mutex_;
+
+ scoped_refptr<Thread> thread_;
+ std::multimap<MonoTime, SchedulerTask> tasks_;
+};
+
// Thread pool with a variable number of threads.
//
// Tasks submitted directly to the thread pool enter a FIFO queue and are
@@ -230,7 +284,9 @@ class ThreadPool {
MonoDelta* threshold = nullptr) const;
private:
+ FRIEND_TEST(ThreadPoolTest, TestSimpleTasks);
FRIEND_TEST(ThreadPoolTest, TestThreadPoolWithNoMinimum);
+ FRIEND_TEST(ThreadPoolTest, TestThreadPoolWithSchedulerAndNoMinimum);
FRIEND_TEST(ThreadPoolTest, TestVariableSizeThreadPool);
friend class ThreadPoolBuilder;
@@ -410,6 +466,20 @@ class ThreadPool {
// * a new task has been scheduled (i.e. added into the queue)
void NotifyLoadMeterUnlocked(const MonoDelta& queue_time = MonoDelta());
+ SchedulerThread* scheduler() {
+ return scheduler_;
+ }
+
+ // Waits until all the tasks and scheduler's tasks completed.
+ void WaitForScheduler();
+
+ // Return the number of threads currently running for this thread pool.
+ // Used by tests to avoid tsan test case down.
+ int num_active_threads() {
+ MutexLock l(lock_);
+ return active_threads_;
+ }
+
const std::string name_;
const int min_threads_;
const int max_threads_;
@@ -503,6 +573,13 @@ class ThreadPool {
// Metrics for the entire thread pool.
const ThreadPoolMetrics metrics_;
+ // TimerThread is used for some scenarios, such as
+ // make a task delay execution.
+ SchedulerThread* scheduler_;
+ uint32_t schedule_period_ms_;
+
+ bool enable_scheduler_;
+
const char* queue_time_trace_metric_name_;
const char* run_wall_time_trace_metric_name_;
const char* run_cpu_time_trace_metric_name_;
@@ -526,6 +603,9 @@ class ThreadPoolToken {
// Submits a new task.
Status Submit(std::function<void()> f) WARN_UNUSED_RESULT;
+ // Submit a task, execute the task after delay_ms later.
+ Status Schedule(std::function<void()> f, int64_t delay_ms)
WARN_UNUSED_RESULT;
+
// Marks the token as unusable for future submissions. Any queued tasks not
// yet running are destroyed. If tasks are in flight, Shutdown() will wait
// on their completion before returning.
@@ -547,6 +627,7 @@ class ThreadPoolToken {
bool WaitFor(const MonoDelta& delta);
private:
+ friend class SchedulerThread;
// All possible token states. Legal state transitions:
// IDLE -> RUNNING: task is submitted via token
// IDLE -> QUIESCED: token or pool is shut down