This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 4e5cd00b6 [util] introduce ThreadPoolToken::Close()
4e5cd00b6 is described below
commit 4e5cd00b62e6be94e410abf98dda6fb066c6e1bb
Author: Alexey Serbin <[email protected]>
AuthorDate: Tue Mar 11 23:27:19 2025 -0700
[util] introduce ThreadPoolToken::Close()
This patch introduces a means to protect a thread pool token from
submission of new tasks, while allowing all of its queued tasks to
complete before the token reaches terminal QUIESCED state. That's the
main difference from ThreadPoolToken::Shutdown(), where all the token's
queued but not-yet-in-flight tasks are thrown out of the thread pool's
queue. From this perspective, the newly introduced Close() method might
be considered as a graceful and asynchronous version of its Shutdown()
counterpart.
This patch also adds a few test scenarios to cover the crucial parts
of the newly introduced functionality.
Change-Id: Ic226319c191af666233ec735c1787389d051b039
Reviewed-on: http://gerrit.cloudera.org:8080/22614
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/util/threadpool-test.cc | 233 ++++++++++++++++++++++++++++++++++++++-
src/kudu/util/threadpool.cc | 78 ++++++++++---
src/kudu/util/threadpool.h | 68 +++++++++---
3 files changed, 351 insertions(+), 28 deletions(-)
diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index 35708ee0c..449cc672c 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -21,7 +21,9 @@
#include <algorithm>
#include <atomic>
+#include <cstddef>
#include <cstdint>
+#include <deque>
#include <functional>
#include <iterator>
#include <limits>
@@ -1009,7 +1011,25 @@ TEST_F(ThreadPoolTest, TestSlowDestructor) {
// For test cases that should run with both kinds of tokens.
class ThreadPoolTestTokenTypes : public ThreadPoolTest,
- public
testing::WithParamInterface<ThreadPool::ExecutionMode> {};
+ public
testing::WithParamInterface<ThreadPool::ExecutionMode> {
+ protected:
+ static bool IsTokenActive(const ThreadPoolToken& t) {
+ std::lock_guard unique_lock(t.pool_->lock_);
+ return t.IsActive();
+ }
+ static bool IsTokenQueueEmpty(const ThreadPoolToken& t) {
+ std::lock_guard unique_lock(t.pool_->lock_);
+ return t.entries_.empty();
+ }
+ static bool IsTokenClosed(const ThreadPoolToken& t) {
+ std::lock_guard unique_lock(t.pool_->lock_);
+ return t.state() == ThreadPoolToken::State::GRACEFUL_QUIESCING;
+ }
+ static bool IsTokenShutDown(const ThreadPoolToken& t) {
+ std::lock_guard unique_lock(t.pool_->lock_);
+ return t.state() == ThreadPoolToken::State::QUIESCED;
+ }
+};
INSTANTIATE_TEST_SUITE_P(Tokens, ThreadPoolTestTokenTypes,
::testing::Values(ThreadPool::ExecutionMode::SERIAL,
@@ -1133,6 +1153,217 @@ TEST_P(ThreadPoolTestTokenTypes, TestTokenShutdown) {
t2->Shutdown();
}
+TEST_P(ThreadPoolTestTokenTypes, QueueIsEmptyAfterWaitingOnClosedToken) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ ASSERT_OK(RebuildPoolWithMinMax(0, 2));
+ for (auto iteration = 0; iteration < 10; ++iteration) {
+ ThreadSafeRandom tsr(SeedRandom());
+
+ unique_ptr<ThreadPoolToken> t(pool_->NewToken(GetParam()));
+ atomic<int32_t> counter{0};
+ constexpr const size_t kTaskNum = 32;
+ for (size_t i = 0; i < kTaskNum; ++i) {
+ ASSERT_OK(t->Submit([&]() {
+ SleepFor(MonoDelta::FromMilliseconds(tsr.Uniform(25)));
+ ++counter;
+ }));
+ }
+
+ // The token should be active now.
+ ASSERT_TRUE(IsTokenActive(*t));
+
+ // Close the token after some random pause. Usually, the Close() request
+ // arrives when the token is RUNNING, but sometimes it may arrive when
+ // the token is IDLE.
+ SleepFor(MonoDelta::FromMilliseconds(tsr.Uniform(100)));
+ t->Close();
+
+ // Wait for the tasks to complete.
+ t->Wait();
+ ASSERT_FALSE(IsTokenActive(*t));
+ ASSERT_TRUE(IsTokenShutDown(*t));
+ ASSERT_TRUE(IsTokenQueueEmpty(*t));
+ ASSERT_EQ(kTaskNum, counter);
+ }
+}
+
+TEST_P(ThreadPoolTestTokenTypes, CloseIdleToken) {
+ unique_ptr<ThreadPoolToken> t(pool_->NewToken(GetParam()));
+ ASSERT_FALSE(IsTokenClosed(*t));
+ t->Close();
+ ASSERT_FALSE(IsTokenActive(*t));
+ ASSERT_TRUE(IsTokenQueueEmpty(*t));
+ ASSERT_TRUE(IsTokenShutDown(*t));
+ // It's not possible to submit new tasks on a closed token.
+ ASSERT_TRUE(t->Submit([](){}).IsServiceUnavailable());
+}
+
+TEST_P(ThreadPoolTestTokenTypes, CloseTokenBasic) {
+ constexpr const size_t kTaskNum = 64;
+
+ CountDownLatch l(1);
+ atomic<int32_t> counter{0};
+ unique_ptr<ThreadPoolToken> t(pool_->NewToken(GetParam()));
+ for (size_t i = 0; i < kTaskNum; ++i) {
+ ASSERT_OK(t->Submit([&]() {
+ l.Wait();
+ ++counter;
+ }));
+ }
+
+ // The token should be active now.
+ ASSERT_TRUE(IsTokenActive(*t));
+ // Close the token.
+ t->Close();
+ // The token should be still active after calling Close().
+ ASSERT_TRUE(IsTokenActive(*t));
+ ASSERT_TRUE(IsTokenClosed(*t));
+ // It's not possible to submit new tasks on a token that's been closed.
+ ASSERT_TRUE(t->Submit([](){}).IsServiceUnavailable());
+
+ // Unblock all of the in-flights tasks.
+ l.CountDown();
+ // Wait for the tasks to complete.
+ t->Wait();
+ // ThreadPoolToken::Close() drains the queue of the token's scheduled tasks.
+ ASSERT_TRUE(IsTokenQueueEmpty(*t));
+ ASSERT_FALSE(IsTokenActive(*t));
+ ASSERT_TRUE(IsTokenShutDown(*t));
+ ASSERT_EQ(kTaskNum, counter);
+
+ // Try submitting a task once more after all the pending tasks are complete.
+ ASSERT_TRUE(t->Submit([](){}).IsServiceUnavailable());
+
+ // Call Close() again: it's a no-op at this point.
+ t->Close();
+ ASSERT_TRUE(t->Submit([](){}).IsServiceUnavailable());
+ // There should be no active tasks at this point.
+ ASSERT_FALSE(IsTokenActive(*t));
+}
+
+TEST_P(ThreadPoolTestTokenTypes, ShutdownClosedToken) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ ASSERT_OK(RebuildPoolWithMinMax(1, 1));
+ unique_ptr<ThreadPoolToken> t(pool_->NewToken(GetParam()));
+
+ CountDownLatch l(1);
+ atomic<int32_t> counter{0};
+ for (int i = 0; i < 10; i++) {
+ ASSERT_OK(t->Submit([&]() {
+ l.Wait();
+ ++counter;
+ }));
+ }
+
+ // The token should be active now.
+ ASSERT_TRUE(IsTokenActive(*t));
+
+ // Close the token.
+ t->Close();
+ ASSERT_TRUE(IsTokenClosed(*t));
+ // It's not possible to submit new tasks on a token that's been closed.
+ ASSERT_TRUE(t->Submit([](){}).IsServiceUnavailable());
+ // Worker thread(s) should still be busy with the in-flight task(s), if any.
+ ASSERT_TRUE(IsTokenActive(*t));
+
+ // This thread unblocks the in-flight tasks, so t->Shutdown() below
eventually
+ // returns.
+ //
+ // NOTE: a relatively long delay is used to avoid flakiness
+ // if main test thread is scheduled off CPU for a long time
+ // before it runs t->Shutdown() below
+ thread unblocker([&]{
+ SleepFor(MonoDelta::FromSeconds(3));
+ // Unblock all of the tasks.
+ l.CountDown();
+ });
+ SCOPED_CLEANUP({
+ unblocker.join();
+ });
+
+ // Shutdown the closed token.
+ t->Shutdown();
+ ASSERT_TRUE(IsTokenShutDown(*t));
+ // The token's queue must be empty after it was shut down.
+ ASSERT_TRUE(IsTokenQueueEmpty(*t));
+ // There should be no active tasks after the token is shut down.
+ ASSERT_FALSE(IsTokenActive(*t));
+ // Shutting down the token after closing it should keep the token unavailable
+ // for the submission of new tasks.
+ ASSERT_TRUE(t->Submit([](){}).IsServiceUnavailable());
+
+ // All but maybe the very first task should have been removed from the queue,
+ // so not more than one task might be completed.
+ ASSERT_LE(counter, 1);
+}
+
+TEST_P(ThreadPoolTestTokenTypes, CloseMultipleIndependentTokens) {
+ ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+ .set_max_threads(4)));
+
+ unique_ptr<ThreadPoolToken> t1(pool_->NewToken(GetParam()));
+ CountDownLatch l1(1);
+ atomic<int32_t> c1{0};
+ for (int i = 0; i < 8; i++) {
+ ASSERT_OK(t1->Submit([&]() {
+ l1.Wait();
+ ++c1;
+ }));
+ }
+ auto l1_unblock = MakeScopedCleanup([&]() {
+ l1.CountDown();
+ });
+ ASSERT_TRUE(IsTokenActive(*t1));
+
+ unique_ptr<ThreadPoolToken> t2(pool_->NewToken(GetParam()));
+ CountDownLatch l2(1);
+ atomic<int32_t> c2{0};
+ for (int i = 0; i < 8; i++) {
+ ASSERT_OK(t2->Submit([&]() {
+ l2.Wait();
+ ++c2;
+ }));
+ }
+ auto l2_unblock = MakeScopedCleanup([&]() {
+ l2.CountDown();
+ });
+ ASSERT_TRUE(IsTokenActive(*t2));
+
+ // Unblock all of t1's tasks, but not t2's tasks.
+ l1_unblock.run();
+
+ // Close the first token.
+ t1->Close();
+
+ // We can no longer submit to t1 but we can still submit to t2.
+ ASSERT_TRUE(t1->Submit([](){}).IsServiceUnavailable());
+ ASSERT_OK(t2->Submit([](){}));
+
+ t1->Wait();
+ ASSERT_FALSE(IsTokenActive(*t1));
+ ASSERT_EQ(8, c1);
+ t1->Shutdown();
+ ASSERT_TRUE(IsTokenShutDown(*t1));
+ ASSERT_FALSE(IsTokenActive(*t1));
+
+ ASSERT_TRUE(IsTokenActive(*t2));
+ ASSERT_EQ(0, c2);
+ t2->Close();
+ ASSERT_TRUE(IsTokenClosed(*t2));
+ ASSERT_TRUE(t2->Submit([](){}).IsServiceUnavailable());
+ ASSERT_TRUE(IsTokenActive(*t2));
+ ASSERT_EQ(0, c2);
+
+ // Unblock t2's tasks.
+ l2_unblock.run();
+ t2->Wait();
+ ASSERT_FALSE(IsTokenActive(*t2));
+ ASSERT_TRUE(IsTokenShutDown(*t2));
+ ASSERT_EQ(8, c2);
+}
+
TEST_P(ThreadPoolTestTokenTypes, TestTokenWaitForAll) {
const int kNumTokens = 3;
const int kNumSubmissions = 20;
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index 09e2c4ad2..b3c57d630 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -197,6 +197,32 @@ Status ThreadPoolToken::Submit(std::function<void()> f) {
return pool_->DoSubmit(std::move(f), this);
}
+void ThreadPoolToken::Close() {
+ std::unique_lock lock(pool_->lock_);
+ pool_->CheckNotPoolThreadUnlocked();
+
+ switch (state_) {
+ case State::IDLE:
+ // If there aren't any outstanding tasks, quiesce the token immediately.
+ // Otherwise, transition from IDLE into GRACEFUL_QUIESCING state.
+ Transition(entries_.empty() ? State::QUIESCED :
State::GRACEFUL_QUIESCING);
+ return;
+ case State::RUNNING:
+ // Unconditionally transition the token into GRACEFUL_QUIESCING state.
+ // The state machine of the thread pool takes care of the rest,
+ // i.e. waiting for the in-flight task to complete and then start draning
+ // the queue if it's not empty.
+ Transition(State::GRACEFUL_QUIESCING);
+ return;
+ case State::GRACEFUL_QUIESCING:
+ case State::QUIESCING:
+ case State::QUIESCED:
+ // Nothing to do -- the token is already in (graceful) quiescing state or
+ // shut down.
+ return;
+ }
+}
+
void ThreadPoolToken::Shutdown() {
std::unique_lock lock(pool_->lock_);
pool_->CheckNotPoolThreadUnlocked();
@@ -208,12 +234,13 @@ void ThreadPoolToken::Shutdown() {
std::deque<ThreadPool::Task> to_release = std::move(entries_);
pool_->total_queued_tasks_ -= to_release.size();
- switch (state()) {
+ switch (state_) {
case State::IDLE:
// There were no tasks outstanding; we can quiesce the token immediately.
Transition(State::QUIESCED);
break;
case State::RUNNING:
+ case State::GRACEFUL_QUIESCING:
// There were outstanding tasks. If any are still running, switch to
// QUIESCING and wait for them to finish (the worker thread executing
// the token's last task will switch the token to QUIESCED). Otherwise,
@@ -240,7 +267,7 @@ void ThreadPoolToken::Shutdown() {
case State::QUIESCING:
// The token is already quiescing. Just wait for a worker thread to
// switch it to QUIESCED.
- while (state() != State::QUIESCED) {
+ while (state_ != State::QUIESCED) {
not_running_cond_.Wait();
}
break;
@@ -295,8 +322,10 @@ void ThreadPoolToken::Transition(State new_state) {
switch (state_) {
case State::IDLE:
CHECK(new_state == State::RUNNING ||
+ new_state == State::GRACEFUL_QUIESCING ||
new_state == State::QUIESCED);
- if (new_state == State::RUNNING) {
+ if (new_state == State::RUNNING ||
+ new_state == State::GRACEFUL_QUIESCING) {
CHECK(!entries_.empty());
} else {
CHECK(entries_.empty());
@@ -305,13 +334,20 @@ void ThreadPoolToken::Transition(State new_state) {
break;
case State::RUNNING:
CHECK(new_state == State::IDLE ||
+ new_state == State::GRACEFUL_QUIESCING ||
new_state == State::QUIESCING ||
new_state == State::QUIESCED);
- CHECK(entries_.empty());
- if (new_state == State::QUIESCING) {
+ if (new_state != State::GRACEFUL_QUIESCING) {
+ CHECK(entries_.empty());
+ }
+ if (new_state == State::GRACEFUL_QUIESCING || new_state ==
State::QUIESCING) {
CHECK_GT(active_threads_, 0);
}
break;
+ case State::GRACEFUL_QUIESCING:
+ CHECK(new_state == State::QUIESCING ||
+ new_state == State::QUIESCED);
+ break;
case State::QUIESCING:
CHECK(new_state == State::QUIESCED);
CHECK_EQ(active_threads_, 0);
@@ -339,10 +375,16 @@ void ThreadPoolToken::Transition(State new_state) {
const char* ThreadPoolToken::StateToString(State s) {
switch (s) {
- case State::IDLE: return "IDLE"; break;
- case State::RUNNING: return "RUNNING"; break;
- case State::QUIESCING: return "QUIESCING"; break;
- case State::QUIESCED: return "QUIESCED"; break;
+ case State::IDLE:
+ return "IDLE";
+ case State::RUNNING:
+ return "RUNNING";
+ case State::GRACEFUL_QUIESCING:
+ return "GRACEFUL_QUIESCING";
+ case State::QUIESCING:
+ return "QUIESCING";
+ case State::QUIESCED:
+ return "QUIESCED";
}
return "<cannot reach here>";
}
@@ -451,6 +493,7 @@ void ThreadPool::Shutdown() {
t->Transition(ThreadPoolToken::State::QUIESCED);
break;
case ThreadPoolToken::State::RUNNING:
+ case ThreadPoolToken::State::GRACEFUL_QUIESCING:
// The token has tasks associated with it. If they're merely queued
// (i.e. there are no active threads), the tasks will have been removed
// above and we can quiesce immediately. Otherwise, we need to wait for
@@ -604,9 +647,10 @@ Status ThreadPool::DoSubmit(std::function<void()> f,
ThreadPoolToken* token) {
task.submit_time = submit_time;
// Add the task to the token's queue.
- ThreadPoolToken::State state = token->state();
+ const ThreadPoolToken::State state = token->state();
DCHECK(state == ThreadPoolToken::State::IDLE ||
- state == ThreadPoolToken::State::RUNNING);
+ state == ThreadPoolToken::State::RUNNING ||
+ state == ThreadPoolToken::State::GRACEFUL_QUIESCING);
token->entries_.emplace_back(std::move(task));
if (state == ThreadPoolToken::State::IDLE ||
token->mode() == ExecutionMode::CONCURRENT) {
@@ -744,7 +788,8 @@ void ThreadPool::DispatchThread() {
// Get the next token and task to execute.
ThreadPoolToken* token = queue_.front();
queue_.pop_front();
- DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state());
+ DCHECK(token->state() == ThreadPoolToken::State::RUNNING ||
+ token->state() == ThreadPoolToken::State::GRACEFUL_QUIESCING);
DCHECK(!token->entries_.empty());
Task task = std::move(token->entries_.front());
token->entries_.pop_front();
@@ -806,15 +851,20 @@ void ThreadPool::DispatchThread() {
// 1. The token was shut down while we ran its task. Transition to
QUIESCED.
// 2. The token has no more queued tasks. Transition back to IDLE.
// 3. The token has more tasks. Requeue it and transition back to RUNNABLE.
- ThreadPoolToken::State state = token->state();
+ const ThreadPoolToken::State state = token->state();
DCHECK(state == ThreadPoolToken::State::RUNNING ||
+ state == ThreadPoolToken::State::GRACEFUL_QUIESCING ||
state == ThreadPoolToken::State::QUIESCING);
if (--token->active_threads_ == 0) {
if (state == ThreadPoolToken::State::QUIESCING) {
DCHECK(token->entries_.empty());
token->Transition(ThreadPoolToken::State::QUIESCED);
} else if (token->entries_.empty()) {
- token->Transition(ThreadPoolToken::State::IDLE);
+ if (state == ThreadPoolToken::State::GRACEFUL_QUIESCING) {
+ token->Transition(ThreadPoolToken::State::QUIESCED);
+ } else {
+ token->Transition(ThreadPoolToken::State::IDLE);
+ }
} else if (token->mode() == ExecutionMode::SERIAL) {
queue_.emplace_back(token);
}
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index 887594423..b855e0e3b 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -335,6 +335,7 @@ class ThreadPool {
FRIEND_TEST(ThreadPoolTest, TestVariableSizeThreadPool);
friend class ThreadPoolBuilder;
+ friend class ThreadPoolTestTokenTypes;
friend class ThreadPoolToken;
// Client-provided task to be executed by this pool.
@@ -632,7 +633,7 @@ class ThreadPool {
// Entry point for token-based task submission and blocking for a particular
// thread pool. Tokens can only be created via ThreadPool::NewToken().
//
-// All functions are thread-safe. Mutable members are protected via the
+// All public methods are thread-safe. Mutable members are protected via the
// ThreadPool's lock.
class ThreadPoolToken {
public:
@@ -648,6 +649,13 @@ class ThreadPoolToken {
// Submit a task, execute the task after delay_ms later.
Status Schedule(std::function<void()> f, int64_t delay_ms)
WARN_UNUSED_RESULT;
+ // Marks the token as unusable for future submissions, returning immediately.
+ // This lets all the in-flight and scheduled tasks on this token to complete,
+ // unless Shutdown() is called in the middle of the process.
+ // If some tasks were in-flight or queued upon calling Close(), use Wait()
+ // to wait until all the tasks submitted via this token to complete.
+ void Close();
+
// Marks the token as unusable for future submissions. Any queued tasks not
// yet running are destroyed. If tasks are in flight, Shutdown() will wait
// on their completion before returning.
@@ -670,16 +678,41 @@ class ThreadPoolToken {
private:
friend class SchedulerThread;
- // All possible token states. Legal state transitions:
- // IDLE -> RUNNING: task is submitted via token
- // IDLE -> QUIESCED: token or pool is shut down
- // RUNNING -> IDLE: worker thread finishes executing a task and
- // there are no more tasks queued to the token
- // RUNNING -> QUIESCING: token or pool is shut down while worker thread
- // is executing a task
- // RUNNING -> QUIESCED: token or pool is shut down
- // QUIESCING -> QUIESCED: worker thread finishes executing a task
- // belonging to a shut down token or pool
+ friend class ThreadPoolTestTokenTypes;
+
+ // The 'State' enumerates all possible token states.
+ //
+ // Legal state transitions:
+ // IDLE -> RUNNING
+ // task is submitted via token
+ //
+ // IDLE -> QUIESCED
+ // token or pool is closed or shut down
+ //
+ // RUNNING -> IDLE
+ // worker thread finishes executing a task and there are no more tasks
+ // queued to the token
+ //
+ // RUNNING -> GRACEFUL_QUIESCING
+ // token is being closed while worker thread is executing a task
+ //
+ // RUNNING -> QUIESCING
+ // token or pool is shut down while worker thread is executing a task
+ //
+ // RUNNING -> QUIESCED
+ // token or pool is shut down
+ //
+ // GRACEFUL_QUIESCING -> QUIESCING
+ // token is being shut down while worker thread is running a queued task,
+ // draining the queue since the token has been closed
+ //
+ // GRACEFUL_QUIESCING -> QUIESCED
+ // worker thread finishes executing last queued task belonging
+ // to a closed token
+ //
+ // QUIESCING -> QUIESCED
+ // worker thread finishes executing a task belonging
+ // to a shut down token or pool
enum class State {
// Token has no queued tasks.
IDLE,
@@ -688,7 +721,14 @@ class ThreadPoolToken {
RUNNING,
// No new tasks may be submitted to the token. A worker thread is still
- // running a previously queued task.
+ // running one of the token's previously queued tasks, and will continue
+ // running all the rest of the scheduled tasks if any are still present
+ // in the queue unless the token is shut down in the process of doing so.
+ GRACEFUL_QUIESCING,
+
+ // No new tasks may be submitted to the token. A worker thread is still
+ // running a previously queued task, but the rest of already queued
+ // but not yet running tasks will be removed from the queue.
QUIESCING,
// No new tasks may be submitted to the token. There are no active tasks
@@ -718,12 +758,14 @@ class ThreadPoolToken {
// task belonging to this token is already running.
bool IsActive() const {
return state_ == State::RUNNING ||
+ state_ == State::GRACEFUL_QUIESCING ||
state_ == State::QUIESCING;
}
// Returns true if new tasks may be submitted to this token.
bool MaySubmitNewTasks() const {
- return state_ != State::QUIESCING &&
+ return state_ != State::GRACEFUL_QUIESCING &&
+ state_ != State::QUIESCING &&
state_ != State::QUIESCED;
}